tests.rs 35.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
18
19
use super::*;
use assert_matches::assert_matches;
use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
20
use polkadot_node_network_protocol::{view, ObservedRole, our_view};
21
use polkadot_node_subsystem_util::TimeoutExt;
22
use polkadot_primitives::v1::{
23
	AvailableData, BlockData, CandidateCommitments, CandidateDescriptor, GroupIndex,
24
25
	GroupRotationInfo, HeadData, OccupiedCore, PersistedValidationData, PoV, ScheduledCore, Id as ParaId,
	CommittedCandidateReceipt,
26
};
27
use polkadot_subsystem_testhelpers as test_helpers;
28
29

use futures::{executor, future, Future};
30
31
use sc_keystore::LocalKeystore;
use sp_application_crypto::AppKey;
32
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
33
use sp_keyring::Sr25519Keyring;
34
use std::{sync::Arc, time::Duration};
35
use maplit::hashmap;
36

37
38
39
macro_rules! view {
	( $( $hash:expr ),* $(,)? ) => {
		// Finalized number unimportant for availability distribution.
40
		View::new(vec![ $( $hash.clone() ),* ], 0)
41
42
43
	};
}

44
45
46
fn chunk_protocol_message(
	message: AvailabilityGossipMessage,
) -> protocol_v1::AvailabilityDistributionMessage {
47
48
49
50
51
52
	protocol_v1::AvailabilityDistributionMessage::Chunk(
		message.candidate_hash,
		message.erasure_chunk,
	)
}

53
54
55
56
57
58
59
60
61
fn make_per_candidate() -> PerCandidate {
	PerCandidate {
		live_in: HashSet::new(),
		message_vault: HashMap::new(),
		received_messages: HashMap::new(),
		sent_messages: HashMap::new(),
		validators: Vec::new(),
		validator_index: None,
		descriptor: Default::default(),
62
		span: jaeger::Span::Disabled,
63
64
65
	}
}

66
67
68
69
70
struct TestHarness {
	virtual_overseer: test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
}

fn test_harness<T: Future<Output = ()>>(
71
	keystore: SyncCryptoStorePtr,
72
73
74
	test_fx: impl FnOnce(TestHarness) -> T,
) -> ProtocolState {
	sp_tracing::try_init_simple();
75
76
77
78

	let pool = sp_core::testing::TaskExecutor::new();
	let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());

79
	let subsystem = AvailabilityDistributionSubsystem::new(keystore, Default::default());
80
81
82
	let mut state = ProtocolState::default();
	{
		let subsystem = subsystem.run_inner(context, &mut state);
83

84
		let test_fut = test_fx(TestHarness { virtual_overseer });
85

86
87
		futures::pin_mut!(test_fut);
		futures::pin_mut!(subsystem);
88

89
90
		executor::block_on(future::select(test_fut, subsystem));
	}
91

92
	state
93
94
95
96
}

async fn overseer_send(
	overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
97
	msg: impl Into<AvailabilityDistributionMessage>,
98
) {
99
	let msg = msg.into();
100
	tracing::trace!(msg = ?msg, "sending message");
101
	overseer.send(FromOverseer::Communication { msg }).await
102
103
104
105
106
}

async fn overseer_recv(
	overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
) -> AllMessages {
107
	tracing::trace!("waiting for message ...");
108
	let msg = overseer.recv().await;
109
	tracing::trace!(msg = ?msg, "received message");
110
111
112
	msg
}

113
fn occupied_core_from_candidate(receipt: &CommittedCandidateReceipt) -> CoreState {
114
115
116
117
118
119
120
	CoreState::Occupied(OccupiedCore {
		next_up_on_available: None,
		occupied_since: 0,
		time_out_at: 5,
		next_up_on_time_out: None,
		availability: Default::default(),
		group_responsible: GroupIndex::from(0),
121
122
		candidate_hash: receipt.hash(),
		candidate_descriptor: receipt.descriptor().clone(),
123
124
125
126
127
128
129
130
131
132
	})
}

#[derive(Clone)]
struct TestState {
	chain_ids: Vec<ParaId>,
	validators: Vec<Sr25519Keyring>,
	validator_public: Vec<ValidatorId>,
	validator_groups: (Vec<Vec<ValidatorIndex>>, GroupRotationInfo),
	head_data: HashMap<ParaId, HeadData>,
133
	keystore: SyncCryptoStorePtr,
134
135
136
	relay_parent: Hash,
	ancestors: Vec<Hash>,
	availability_cores: Vec<CoreState>,
137
	persisted_validation_data: PersistedValidationData,
138
139
	candidates: Vec<CommittedCandidateReceipt>,
	pov_blocks: Vec<PoV>,
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
}

fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec<ValidatorId> {
	val_ids.iter().map(|v| v.public().into()).collect()
}

impl Default for TestState {
	fn default() -> Self {
		let chain_a = ParaId::from(1);
		let chain_b = ParaId::from(2);

		let chain_ids = vec![chain_a, chain_b];

		let validators = vec![
			Sr25519Keyring::Ferdie, // <- this node, role: validator
			Sr25519Keyring::Alice,
			Sr25519Keyring::Bob,
			Sr25519Keyring::Charlie,
			Sr25519Keyring::Dave,
		];

161
		let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
162

163
164
165
166
167
168
		SyncCryptoStore::sr25519_generate_new(
			&*keystore,
			ValidatorId::ID,
			Some(&validators[0].to_seed()),
		)
		.expect("Insert key into keystore");
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201

		let validator_public = validator_pubkeys(&validators);

		let validator_groups = vec![vec![2, 0, 4], vec![1], vec![3]];
		let group_rotation_info = GroupRotationInfo {
			session_start_block: 0,
			group_rotation_frequency: 100,
			now: 1,
		};
		let validator_groups = (validator_groups, group_rotation_info);

		let availability_cores = vec![
			CoreState::Scheduled(ScheduledCore {
				para_id: chain_ids[0],
				collator: None,
			}),
			CoreState::Scheduled(ScheduledCore {
				para_id: chain_ids[1],
				collator: None,
			}),
		];

		let mut head_data = HashMap::new();
		head_data.insert(chain_a, HeadData(vec![4, 5, 6]));
		head_data.insert(chain_b, HeadData(vec![7, 8, 9]));

		let ancestors = vec![
			Hash::repeat_byte(0x44),
			Hash::repeat_byte(0x33),
			Hash::repeat_byte(0x22),
		];
		let relay_parent = Hash::repeat_byte(0x05);

202
		let persisted_validation_data = PersistedValidationData {
203
			parent_head: HeadData(vec![7, 8, 9]),
204
			relay_parent_number: Default::default(),
205
			max_pov_size: 1024,
206
			relay_parent_storage_root: Default::default(),
207
208
		};

209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
		let pov_block_a = PoV {
			block_data: BlockData(vec![42, 43, 44]),
		};

		let pov_block_b = PoV {
			block_data: BlockData(vec![45, 46, 47]),
		};

		let candidates = vec![
			TestCandidateBuilder {
				para_id: chain_ids[0],
				relay_parent: relay_parent,
				pov_hash: pov_block_a.hash(),
				erasure_root: make_erasure_root(persisted_validation_data.clone(), validators.len(), pov_block_a.clone()),
				head_data: head_data.get(&chain_ids[0]).unwrap().clone(),
				..Default::default()
			}
			.build(),
			TestCandidateBuilder {
				para_id: chain_ids[1],
				relay_parent: relay_parent,
				pov_hash: pov_block_b.hash(),
				erasure_root: make_erasure_root(persisted_validation_data.clone(), validators.len(), pov_block_b.clone()),
				head_data: head_data.get(&chain_ids[1]).unwrap().clone(),
				..Default::default()
			}
			.build(),
		];

		let pov_blocks = vec![pov_block_a, pov_block_b];
239
240
241
242
243
244
245
246
247

		Self {
			chain_ids,
			keystore,
			validators,
			validator_public,
			validator_groups,
			availability_cores,
			head_data,
248
			persisted_validation_data,
249
250
			relay_parent,
			ancestors,
251
252
			candidates,
			pov_blocks,
253
254
255
256
		}
	}
}

257
fn make_available_data(validation_data: PersistedValidationData, pov: PoV) -> AvailableData {
258
	AvailableData {
259
		validation_data,
260
		pov: Arc::new(pov),
261
262
263
	}
}

264
265
fn make_erasure_root(peristed: PersistedValidationData, validator_count: usize, pov: PoV) -> Hash {
	let available_data = make_available_data(peristed, pov);
266

267
	let chunks = obtain_chunks(validator_count, &available_data).unwrap();
268
269
270
	branches(&chunks).root()
}

271
272
273
274
275
276
fn make_erasure_chunks(peristed: PersistedValidationData, validator_count: usize, pov: PoV) -> Vec<ErasureChunk> {
	let available_data = make_available_data(peristed, pov);

	derive_erasure_chunks_with_proofs(validator_count, &available_data)
}

277
278
fn make_valid_availability_gossip(
	test: &TestState,
279
	candidate: usize,
280
281
	erasure_chunk_index: u32,
) -> AvailabilityGossipMessage {
282
283
284
285
286
	let erasure_chunks = make_erasure_chunks(
		test.persisted_validation_data.clone(),
		test.validator_public.len(),
		test.pov_blocks[candidate].clone(),
	);
287
288
289
290
291
292
293

	let erasure_chunk: ErasureChunk = erasure_chunks
		.get(erasure_chunk_index as usize)
		.expect("Must be valid or input is oob")
		.clone();

	AvailabilityGossipMessage {
294
		candidate_hash: test.candidates[candidate].hash(),
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
		erasure_chunk,
	}
}

#[derive(Default)]
struct TestCandidateBuilder {
	para_id: ParaId,
	head_data: HeadData,
	pov_hash: Hash,
	relay_parent: Hash,
	erasure_root: Hash,
}

impl TestCandidateBuilder {
	fn build(self) -> CommittedCandidateReceipt {
		CommittedCandidateReceipt {
			descriptor: CandidateDescriptor {
				para_id: self.para_id,
				pov_hash: self.pov_hash,
				relay_parent: self.relay_parent,
315
				erasure_root: self.erasure_root,
316
317
318
319
320
321
322
323
324
325
326
327
328
329
				..Default::default()
			},
			commitments: CandidateCommitments {
				head_data: self.head_data,
				..Default::default()
			},
		}
	}
}

#[test]
fn helper_integrity() {
	let test_state = TestState::default();

330
331
332
333
334
	let message = make_valid_availability_gossip(
		&test_state,
		0,
		2,
	);
335

336
	let root = &test_state.candidates[0].descriptor.erasure_root;
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370

	let anticipated_hash = branch_hash(
		root,
		&message.erasure_chunk.proof,
		dbg!(message.erasure_chunk.index as usize),
	)
	.expect("Must be able to derive branch hash");
	assert_eq!(
		anticipated_hash,
		BlakeTwo256::hash(&message.erasure_chunk.chunk)
	);
}

fn derive_erasure_chunks_with_proofs(
	n_validators: usize,
	available_data: &AvailableData,
) -> Vec<ErasureChunk> {
	let chunks: Vec<Vec<u8>> = obtain_chunks(n_validators, available_data).unwrap();

	// create proofs for each erasure chunk
	let branches = branches(chunks.as_ref());

	let erasure_chunks = branches
		.enumerate()
		.map(|(index, (proof, chunk))| ErasureChunk {
			chunk: chunk.to_vec(),
			index: index as _,
			proof,
		})
		.collect::<Vec<ErasureChunk>>();

	erasure_chunks
}

371
372
async fn expect_chunks_network_message(
	virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
373
	peers: &[Vec<PeerId>],
374
375
376
	candidates: &[CandidateHash],
	chunks: &[ErasureChunk],
) {
377
378
379
380
381
382
383
384
385
386
387
	if chunks.is_empty() { return }

	assert_matches!(
		overseer_recv(virtual_overseer).await,
		AllMessages::NetworkBridge(
			NetworkBridgeMessage::SendValidationMessages(msgs)
		) => {
			assert_eq!(msgs.len(), chunks.len());
			for (send_peers, msg) in msgs {
				assert_matches!(
					msg,
388
					protocol_v1::ValidationProtocol::AvailabilityDistribution(
389
390
391
392
393
394
395
						protocol_v1::AvailabilityDistributionMessage::Chunk(send_candidate, send_chunk)
					) => {
						let i = chunks.iter().position(|c| c == &send_chunk).unwrap();
						assert!(candidates.contains(&send_candidate), format!("Could not find candidate: {:?}", send_candidate));
						assert_eq!(&peers[i], &send_peers);
					}
				);
396
			}
397
398
		}
	)
399
}
400

401
402
async fn change_our_view(
	virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
403
	view: OurView,
404
405
406
407
408
409
410
411
412
	validator_public: &[ValidatorId],
	ancestors: Vec<Hash>,
	session_per_relay_parent: HashMap<Hash, SessionIndex>,
	availability_cores_per_relay_parent: HashMap<Hash, Vec<CoreState>>,
	data_availability: HashMap<CandidateHash, bool>,
	chunk_data_per_candidate: HashMap<CandidateHash, (PoV, PersistedValidationData)>,
	send_chunks_to: HashMap<CandidateHash, Vec<PeerId>>,
) {
	overseer_send(virtual_overseer, NetworkBridgeEvent::OurViewChange(view.clone())).await;
413

414
415
416
417
418
419
420
421
422
423
424
	// obtain the validators per relay parent
	assert_matches!(
		overseer_recv(virtual_overseer).await,
		AllMessages::RuntimeApi(RuntimeApiMessage::Request(
			relay_parent,
			RuntimeApiRequest::Validators(tx),
		)) => {
			assert!(view.contains(&relay_parent));
			tx.send(Ok(validator_public.to_vec())).unwrap();
		}
	);
425

426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
	// query of k ancestors, we only provide one
	assert_matches!(
		overseer_recv(virtual_overseer).await,
		AllMessages::ChainApi(ChainApiMessage::Ancestors {
			hash: relay_parent,
			k,
			response_channel: tx,
		}) => {
			assert!(view.contains(&relay_parent));
			assert_eq!(k, AvailabilityDistributionSubsystem::K + 1);
			tx.send(Ok(ancestors.clone())).unwrap();
		}
	);

	for _ in 0..session_per_relay_parent.len() {
441
		assert_matches!(
442
			overseer_recv(virtual_overseer).await,
443
444
445
446
			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
				relay_parent,
				RuntimeApiRequest::SessionIndexForChild(tx)
			)) => {
447
448
449
				let index = session_per_relay_parent.get(&relay_parent)
					.expect(&format!("Session index for relay parent {:?} does not exist", relay_parent));
				tx.send(Ok(*index)).unwrap();
450
451
			}
		);
452
	}
453

454
	for _ in 0..availability_cores_per_relay_parent.len() {
455
		assert_matches!(
456
			overseer_recv(virtual_overseer).await,
457
458
459
460
			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
				relay_parent,
				RuntimeApiRequest::AvailabilityCores(tx)
			)) => {
461
462
				let cores = availability_cores_per_relay_parent.get(&relay_parent)
					.expect(&format!("Availability core for relay parent {:?} does not exist", relay_parent));
463

464
				tx.send(Ok(cores.clone())).unwrap();
465
466
			}
		);
467
	}
468

469
470
471
	let mut send_peers = Vec::new();
	let mut send_chunks = Vec::new();
	let mut candidates = Vec::new();
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
	for _ in 0..data_availability.len() {
		let (available, candidate_hash) = assert_matches!(
			overseer_recv(virtual_overseer).await,
			AllMessages::AvailabilityStore(
				AvailabilityStoreMessage::QueryDataAvailability(
					candidate_hash,
					tx,
				)
			) => {
				let available = data_availability.get(&candidate_hash)
					.expect(&format!("No data availability for candidate {:?}", candidate_hash));

				tx.send(*available).unwrap();
				(available, candidate_hash)
			}
		);
488

489
490
491
		if !available {
			continue;
		}
492

493
		candidates.push(candidate_hash);
494
495
		if let Some((pov, persisted)) = chunk_data_per_candidate.get(&candidate_hash) {
			let chunks = make_erasure_chunks(persisted.clone(), validator_public.len(), pov.clone());
496

497
498
499
			for _ in 0..chunks.len() {
				let chunk = assert_matches!(
					overseer_recv(virtual_overseer).await,
500
501
502
					AllMessages::AvailabilityStore(
						AvailabilityStoreMessage::QueryChunk(
							candidate_hash,
503
							index,
504
505
506
							tx,
						)
					) => {
507
508
509
510
						tracing::trace!("Query chunk {} for candidate {:?}", index, candidate_hash);
						let chunk = chunks[index as usize].clone();
						tx.send(Some(chunk.clone())).unwrap();
						chunk
511
512
					}
				);
513
514

				if let Some(peers) = send_chunks_to.get(&candidate_hash) {
515
516
					send_peers.push(peers.clone());
					send_chunks.push(chunk);
517
				}
518
			}
519

520
		}
521
	}
522
523

	expect_chunks_network_message(virtual_overseer, &send_peers, &candidates, &send_chunks).await;
524
525
526
527
528
529
530
531
}

async fn setup_peer_with_view(
	virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
	peer: PeerId,
	view: View,
) {
	overseer_send(virtual_overseer, NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full)).await;
532

533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
	overseer_send(virtual_overseer, NetworkBridgeEvent::PeerViewChange(peer, view)).await;
}

async fn peer_send_message(
	virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
	peer: PeerId,
	message: AvailabilityGossipMessage,
	expected_reputation_change: Rep,
) {
	overseer_send(virtual_overseer, NetworkBridgeEvent::PeerMessage(peer.clone(), chunk_protocol_message(message))).await;

	assert_matches!(
		overseer_recv(virtual_overseer).await,
		AllMessages::NetworkBridge(
			NetworkBridgeMessage::ReportPeer(
				rep_peer,
				rep,
			)
		) => {
			assert_eq!(peer, rep_peer);
			assert_eq!(expected_reputation_change, rep);
		}
	);
}

#[test]
fn check_views() {
	let test_state = TestState::default();

	let peer_a = PeerId::random();
	let peer_a_2 = peer_a.clone();
	let peer_b = PeerId::random();
	let peer_b_2 = peer_b.clone();
	assert_ne!(&peer_a, &peer_b);

	let keystore = test_state.keystore.clone();
	let current = test_state.relay_parent;
	let ancestors = test_state.ancestors.clone();

	let state = test_harness(keystore, move |test_harness| async move {
		let mut virtual_overseer = test_harness.virtual_overseer;

		let TestState {
			validator_public,
			relay_parent: current,
			ancestors,
			candidates,
			pov_blocks,
			..
		} = test_state.clone();

		let genesis = Hash::repeat_byte(0xAA);
		change_our_view(
586
			&mut virtual_overseer,
587
			our_view![current],
588
589
590
591
			&validator_public,
			vec![ancestors[0], genesis],
			hashmap! { current => 1, genesis => 1 },
			hashmap! {
592
593
594
595
				ancestors[0] => vec![
					occupied_core_from_candidate(&candidates[0]),
					occupied_core_from_candidate(&candidates[1]),
				],
596
597
598
599
600
601
602
603
				current => vec![
					CoreState::Occupied(OccupiedCore {
						next_up_on_available: None,
						occupied_since: 0,
						time_out_at: 10,
						next_up_on_time_out: None,
						availability: Default::default(),
						group_responsible: GroupIndex::from(0),
604
605
						candidate_hash: candidates[0].hash(),
						candidate_descriptor: candidates[0].descriptor().clone(),
606
607
608
609
610
611
612
613
614
615
					}),
					CoreState::Free,
					CoreState::Free,
					CoreState::Occupied(OccupiedCore {
						next_up_on_available: None,
						occupied_since: 1,
						time_out_at: 7,
						next_up_on_time_out: None,
						availability: Default::default(),
						group_responsible: GroupIndex::from(0),
616
617
						candidate_hash: candidates[1].hash(),
						candidate_descriptor: candidates[1].descriptor().clone(),
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
					}),
					CoreState::Free,
					CoreState::Free,
				]
			},
			hashmap! {
				candidates[0].hash() => true,
				candidates[1].hash() => false,
			},
			hashmap! {
				candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone()),
			},
			hashmap! {},
		).await;

		// setup peer a with interest in current
		setup_peer_with_view(&mut virtual_overseer, peer_a.clone(), view![current]).await;
635
636

		// setup peer b with interest in ancestor
637
638
		setup_peer_with_view(&mut virtual_overseer, peer_b.clone(), view![ancestors[0]]).await;
	});
639

640
641
642
643
644
645
646
647
648
649
650
651
652
653
	assert_matches! {
		state,
		ProtocolState {
			peer_views,
			view,
			..
		} => {
			assert_eq!(
				peer_views,
				hashmap! {
					peer_a_2 => view![current],
					peer_b_2 => view![ancestors[0]],
				},
			);
654
			assert_eq!(view, our_view![current]);
655
656
657
658
659
660
661
662
663
664
665
666
667
		}
	};
}

#[test]
fn reputation_verification() {
	let test_state = TestState::default();

	let peer_a = PeerId::random();
	let peer_b = PeerId::random();
	assert_ne!(&peer_a, &peer_b);

	let keystore = test_state.keystore.clone();
668

669
670
	test_harness(keystore, move |test_harness| async move {
		let mut virtual_overseer = test_harness.virtual_overseer;
671

672
673
674
675
676
677
678
679
680
681
		let TestState {
			relay_parent: current,
			validator_public,
			ancestors,
			candidates,
			pov_blocks,
			..
		} = test_state.clone();

		let valid = make_valid_availability_gossip(
682
			&test_state,
683
			0,
684
685
686
			2,
		);

687
688
		change_our_view(
			&mut virtual_overseer,
689
			our_view![current],
690
691
692
693
694
			&validator_public,
			vec![ancestors[0]],
			hashmap! { current => 1 },
			hashmap! {
				current => vec![
695
696
					occupied_core_from_candidate(&candidates[0]),
					occupied_core_from_candidate(&candidates[1]),
697
698
699
700
701
702
				],
			},
			hashmap! { candidates[0].hash() => true, candidates[1].hash() => false },
			hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())},
			hashmap! {},
		).await;
703

704
705
		// valid (first, from b)
		peer_send_message(&mut virtual_overseer, peer_b.clone(), valid.clone(), BENEFIT_VALID_MESSAGE).await;
706

707
708
		// valid (duplicate, from b)
		peer_send_message(&mut virtual_overseer, peer_b.clone(), valid.clone(), COST_PEER_DUPLICATE_MESSAGE).await;
709

710
711
		// valid (second, from a)
		peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), BENEFIT_VALID_MESSAGE).await;
712

713
714
715
716
717
718
719
720
		// send the a message again, so we should detect the duplicate
		peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), COST_PEER_DUPLICATE_MESSAGE).await;

		// peer b sends a message before we have the view
		// setup peer a with interest in parent x
		overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerDisconnected(peer_b.clone())).await;

		overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full)).await;
721
722

		{
723
724
			// send another message
			let valid = make_valid_availability_gossip(&test_state, 1, 2);
725

726
727
728
729
730
731
732
			// Make peer a and b listen on `current`
			overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![current])).await;

			let mut chunks = make_erasure_chunks(
				test_state.persisted_validation_data.clone(),
				validator_public.len(),
				pov_blocks[0].clone(),
733
734
			);

735
736
			// Both peers send us this chunk already
			chunks.remove(2);
737

738
739
			let send_peers = chunks.iter().map(|_| vec![peer_a.clone()]).collect::<Vec<_>>();
			expect_chunks_network_message(&mut virtual_overseer, &send_peers, &[candidates[0].hash()], &chunks).await;
740
741
742

			overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![current])).await;

743
744
			let send_peers = chunks.iter().map(|_| vec![peer_b.clone()]).collect::<Vec<_>>();
			expect_chunks_network_message(&mut virtual_overseer, &send_peers, &[candidates[0].hash()], &chunks).await;
745
746
747
748

			peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), BENEFIT_VALID_MESSAGE_FIRST).await;

			expect_chunks_network_message(
749
				&mut virtual_overseer,
750
				&[vec![peer_b.clone()]],
751
752
753
				&[candidates[1].hash()],
				&[valid.erasure_chunk.clone()],
			).await;
754

755
756
			// Let B send the same message
			peer_send_message(&mut virtual_overseer, peer_b.clone(), valid.clone(), BENEFIT_VALID_MESSAGE).await;
757
		}
758
759
	});
}
760

761
762
763
#[test]
fn not_a_live_candidate_is_detected() {
	let test_state = TestState::default();
764

765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
	let peer_a = PeerId::random();

	let keystore = test_state.keystore.clone();

	test_harness(keystore, move |test_harness| async move {
		let mut virtual_overseer = test_harness.virtual_overseer;

		let TestState {
			relay_parent: current,
			validator_public,
			ancestors,
			candidates,
			pov_blocks,
			..
		} = test_state.clone();
780

781
		change_our_view(
782
			&mut virtual_overseer,
783
			our_view![current],
784
785
786
787
788
			&validator_public,
			vec![ancestors[0]],
			hashmap! { current => 1 },
			hashmap! {
				current => vec![
789
					occupied_core_from_candidate(&candidates[0]),
790
791
792
793
794
795
				],
			},
			hashmap! { candidates[0].hash() => true },
			hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())},
			hashmap! {},
		).await;
796

797
798
799
800
801
		let valid = make_valid_availability_gossip(
			&test_state,
			1,
			1,
		);
802

803
804
805
		peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), COST_NOT_A_LIVE_CANDIDATE).await;
	});
}
806

807
808
809
#[test]
fn peer_change_view_before_us() {
	let test_state = TestState::default();
810

811
	let peer_a = PeerId::random();
812

813
	let keystore = test_state.keystore.clone();
814

815
816
	test_harness(keystore, move |test_harness| async move {
		let mut virtual_overseer = test_harness.virtual_overseer;
817

818
819
820
821
822
823
824
825
		let TestState {
			relay_parent: current,
			validator_public,
			ancestors,
			candidates,
			pov_blocks,
			..
		} = test_state.clone();
826

827
		setup_peer_with_view(&mut virtual_overseer, peer_a.clone(), view![current]).await;
828

829
830
		change_our_view(
			&mut virtual_overseer,
831
			our_view![current],
832
833
834
835
836
			&validator_public,
			vec![ancestors[0]],
			hashmap! { current => 1 },
			hashmap! {
				current => vec![
837
					occupied_core_from_candidate(&candidates[0]),
838
839
840
841
842
843
				],
			},
			hashmap! { candidates[0].hash() => true },
			hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())},
			hashmap! { candidates[0].hash() => vec![peer_a.clone()] },
		).await;
844

845
846
847
848
849
		let valid = make_valid_availability_gossip(
			&test_state,
			0,
			0,
		);
850

851
852
853
854
		// We send peer a all the chunks of candidate0, so we just benefit him for sending a valid message
		peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), BENEFIT_VALID_MESSAGE).await;
	});
}
855

856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
#[test]
fn candidate_chunks_are_put_into_message_vault_when_candidate_is_first_seen() {
	let test_state = TestState::default();

	let peer_a = PeerId::random();

	let keystore = test_state.keystore.clone();

	test_harness(keystore, move |test_harness| async move {
		let mut virtual_overseer = test_harness.virtual_overseer;

		let TestState {
			relay_parent: current,
			validator_public,
			ancestors,
			candidates,
			pov_blocks,
			..
		} = test_state.clone();

		change_our_view(
			&mut virtual_overseer,
878
			our_view![ancestors[0]],
879
880
881
882
883
			&validator_public,
			vec![ancestors[1]],
			hashmap! { ancestors[0] => 1 },
			hashmap! {
				ancestors[0] => vec![
884
					occupied_core_from_candidate(&candidates[0]),
885
886
887
888
889
890
891
892
893
				],
			},
			hashmap! { candidates[0].hash() => true },
			hashmap! { candidates[0].hash() => (pov_blocks[0].clone(), test_state.persisted_validation_data.clone())},
			hashmap! {},
		).await;

		change_our_view(
			&mut virtual_overseer,
894
			our_view![current],
895
896
897
898
899
			&validator_public,
			vec![ancestors[0]],
			hashmap! { current => 1 },
			hashmap! {
				current => vec![
900
					occupied_core_from_candidate(&candidates[0]),
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
				],
			},
			hashmap! { candidates[0].hash() => true },
			hashmap! {},
			hashmap! {},
		).await;

		// Let peera connect, we should send him all the chunks of the candidate
		setup_peer_with_view(&mut virtual_overseer, peer_a.clone(), view![current]).await;

		let chunks = make_erasure_chunks(
			test_state.persisted_validation_data.clone(),
			validator_public.len(),
			pov_blocks[0].clone(),
		);
916
		let send_peers = chunks.iter().map(|_| vec![peer_a.clone()]).collect::<Vec<_>>();
917
918
		expect_chunks_network_message(
			&mut virtual_overseer,
919
			&send_peers,
920
921
922
			&[candidates[0].hash()],
			&chunks,
		).await;
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
	});
}

#[test]
fn k_ancestors_in_session() {
	let pool = sp_core::testing::TaskExecutor::new();
	let (mut ctx, mut virtual_overseer) =
		test_helpers::make_subsystem_context::<AvailabilityDistributionMessage, _>(pool);

	const DATA: &[(Hash, SessionIndex)] = &[
		(Hash::repeat_byte(0x32), 3), // relay parent
		(Hash::repeat_byte(0x31), 3), // grand parent
		(Hash::repeat_byte(0x30), 3), // great ...
		(Hash::repeat_byte(0x20), 2),
		(Hash::repeat_byte(0x12), 1),
		(Hash::repeat_byte(0x11), 1),
		(Hash::repeat_byte(0x10), 1),
	];
	const K: usize = 5;

	const EXPECTED: &[Hash] = &[DATA[1].0, DATA[2].0];

	let test_fut = async move {
		assert_matches!(
			overseer_recv(&mut virtual_overseer).await,
			AllMessages::ChainApi(ChainApiMessage::Ancestors {
				hash: relay_parent,
				k,
				response_channel: tx,
			}) => {
				assert_eq!(k, K+1);
				assert_eq!(relay_parent, DATA[0].0);
				tx.send(Ok(DATA[1..=k].into_iter().map(|x| x.0).collect::<Vec<_>>())).unwrap();
			}
		);

		// query the desired session index of the relay parent
		assert_matches!(
			overseer_recv(&mut virtual_overseer).await,
			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
				relay_parent,
				RuntimeApiRequest::SessionIndexForChild(tx),
			)) => {
				assert_eq!(relay_parent, DATA[0].0);
				let session: SessionIndex = DATA[0].1;
				tx.send(Ok(session)).unwrap();
			}
		);

		// query ancestors
		for i in 2usize..=(EXPECTED.len() + 1 + 1) {
			assert_matches!(
				overseer_recv(&mut virtual_overseer).await,
				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
					relay_parent,
					RuntimeApiRequest::SessionIndexForChild(tx),
				)) => {
					// query is for ancestor_parent
					let x = &DATA[i];
					assert_eq!(relay_parent, x.0);
					// but needs to yield ancestor_parent's child's session index
					let x = &DATA[i-1];
					tx.send(Ok(x.1)).unwrap();
				}
			);
		}
	};

	let sut = async move {
		let ancestors = query_up_to_k_ancestors_in_same_session(&mut ctx, DATA[0].0, K)
			.await
			.unwrap();
		assert_eq!(ancestors, EXPECTED.to_vec());
	};

	futures::pin_mut!(test_fut);
	futures::pin_mut!(sut);

For faster browsing, not all history is shown. View entire blame