lib.rs 23.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! The collation generation subsystem is the interface between polkadot and the collators.

#![deny(missing_docs)]

use futures::{
22
	channel::mpsc,
23
24
25
26
27
28
29
30
31
	future::FutureExt,
	join,
	select,
	sink::SinkExt,
	stream::StreamExt,
};
use polkadot_node_primitives::CollationGenerationConfig;
use polkadot_node_subsystem::{
	messages::{AllMessages, CollationGenerationMessage, CollatorProtocolMessage},
32
	FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
33
34
};
use polkadot_node_subsystem_util::{
35
	request_availability_cores_ctx, request_full_validation_data_ctx,
36
	request_validators_ctx,
37
	metrics::{self, prometheus},
38
39
};
use polkadot_primitives::v1::{
40
41
42
	collator_signature_payload, AvailableData, CandidateCommitments,
	CandidateDescriptor, CandidateReceipt, CoreState, Hash, OccupiedCoreAssumption,
	PersistedValidationData, PoV,
43
44
45
46
};
use sp_core::crypto::Pair;
use std::sync::Arc;

47
48
49
50
mod error;

const LOG_TARGET: &'static str = "collation_generation";

51
52
53
/// Collation Generation Subsystem
pub struct CollationGenerationSubsystem {
	config: Option<Arc<CollationGenerationConfig>>,
54
	metrics: Metrics,
55
56
57
}

impl CollationGenerationSubsystem {
58
59
60
61
62
63
64
65
	/// Create a new instance of the `CollationGenerationSubsystem`.
	pub fn new(metrics: Metrics) -> Self {
		Self {
			config: None,
			metrics,
		}
	}

66
67
68
69
70
71
72
73
74
75
76
	/// Run this subsystem
	///
	/// Conceptually, this is very simple: it just loops forever.
	///
	/// - On incoming overseer messages, it starts or stops jobs as appropriate.
	/// - On other incoming messages, if they can be converted into Job::ToJob and
	///   include a hash, then they're forwarded to the appropriate individual job.
	/// - On outgoing messages from the jobs, it forwards them to the overseer.
	///
	/// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur.
	/// Otherwise, most are logged and then discarded.
77
	#[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))]
78
79
80
81
82
83
84
85
86
	async fn run<Context>(mut self, mut ctx: Context)
	where
		Context: SubsystemContext<Message = CollationGenerationMessage>,
	{
		// when we activate new leaves, we spawn a bunch of sub-tasks, each of which is
		// expected to generate precisely one message. We don't want to block the main loop
		// at any point waiting for them all, so instead, we create a channel on which they can
		// send those messages. We can then just monitor the channel and forward messages on it
		// to the overseer here, via the context.
87
		let (sender, receiver) = mpsc::channel(0);
88

89
		let mut receiver = receiver.fuse();
90
91
92
93
94
95
96
		loop {
			select! {
				incoming = ctx.recv().fuse() => {
					if self.handle_incoming::<Context>(incoming, &mut ctx, &sender).await {
						break;
					}
				},
97
				msg = receiver.next() => {
98
					if let Some(msg) = msg {
99
						ctx.send_message(msg).await;
100
101
102
103
104
105
106
107
108
109
					}
				},
			}
		}
	}

	// handle an incoming message. return true if we should break afterwards.
	// note: this doesn't strictly need to be a separate function; it's more an administrative function
	// so that we don't clutter the run loop. It could in principle be inlined directly into there.
	// it should hopefully therefore be ok that it's an async function mutably borrowing self.
110
	#[tracing::instrument(level = "trace", skip(self, ctx, sender), fields(subsystem = LOG_TARGET))]
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
	async fn handle_incoming<Context>(
		&mut self,
		incoming: SubsystemResult<FromOverseer<Context::Message>>,
		ctx: &mut Context,
		sender: &mpsc::Sender<AllMessages>,
	) -> bool
	where
		Context: SubsystemContext<Message = CollationGenerationMessage>,
	{
		use polkadot_node_subsystem::ActiveLeavesUpdate;
		use polkadot_node_subsystem::FromOverseer::{Communication, Signal};
		use polkadot_node_subsystem::OverseerSignal::{ActiveLeaves, BlockFinalized, Conclude};

		match incoming {
			Ok(Signal(ActiveLeaves(ActiveLeavesUpdate { activated, .. }))) => {
				// follow the procedure from the guide
				if let Some(config) = &self.config {
128
					let metrics = self.metrics.clone();
129
					if let Err(err) =
130
						handle_new_activations(config.clone(), &activated, ctx, metrics, sender).await
131
					{
132
						tracing::warn!(target: LOG_TARGET, err = ?err, "failed to handle new activations");
133
134
135
136
137
138
139
140
141
					};
				}
				false
			}
			Ok(Signal(Conclude)) => true,
			Ok(Communication {
				msg: CollationGenerationMessage::Initialize(config),
			}) => {
				if self.config.is_some() {
142
					tracing::error!(target: LOG_TARGET, "double initialization");
143
144
145
				} else {
					self.config = Some(Arc::new(config));
				}
146
				false
147
			}
148
			Ok(Signal(BlockFinalized(..))) => false,
149
			Err(err) => {
150
				tracing::error!(
151
					target: LOG_TARGET,
152
					err = ?err,
153
154
155
					"error receiving message from subsystem context: {:?}",
					err
				);
156
157
158
159
160
161
162
163
164
165
				true
			}
		}
	}
}

impl<Context> Subsystem<Context> for CollationGenerationSubsystem
where
	Context: SubsystemContext<Message = CollationGenerationMessage>,
{
166
	fn start(self, ctx: Context) -> SpawnedSubsystem {
167
168
169
170
		let future = Box::pin(async move {
			self.run(ctx).await;
			Ok(())
		});
171
172

		SpawnedSubsystem {
173
			name: "collation-generation-subsystem",
174
175
176
177
178
			future,
		}
	}
}

179
#[tracing::instrument(level = "trace", skip(ctx, metrics, sender), fields(subsystem = LOG_TARGET))]
180
181
182
183
async fn handle_new_activations<Context: SubsystemContext>(
	config: Arc<CollationGenerationConfig>,
	activated: &[Hash],
	ctx: &mut Context,
184
	metrics: Metrics,
185
	sender: &mpsc::Sender<AllMessages>,
186
) -> crate::error::Result<()> {
187
188
189
	// follow the procedure from the guide:
	// https://w3f.github.io/parachain-implementers-guide/node/collators/collation-generation.html

190
191
	let _overall_timer = metrics.time_new_activations();

192
	for relay_parent in activated.iter().copied() {
193
194
		let _relay_parent_timer = metrics.time_new_activations_relay_parent();

195
196
		// double-future magic happens here: the first layer of requests takes a mutable borrow of the context, and
		// returns a receiver. The second layer of requests actually polls those receivers to completion.
197
		let (availability_cores, validators) = join!(
198
199
200
201
202
203
204
			request_availability_cores_ctx(relay_parent, ctx).await?,
			request_validators_ctx(relay_parent, ctx).await?,
		);

		let availability_cores = availability_cores??;
		let n_validators = validators??.len();

205
		for (core_idx, core) in availability_cores.into_iter().enumerate() {
206
207
			let _availability_core_timer = metrics.time_new_activations_availability_core();

208
209
210
211
212
213
			let (scheduled_core, assumption) = match core {
				CoreState::Scheduled(scheduled_core) => {
					(scheduled_core, OccupiedCoreAssumption::Free)
				}
				CoreState::Occupied(_occupied_core) => {
					// TODO: https://github.com/paritytech/polkadot/issues/1573
214
215
216
217
218
219
					tracing::trace!(
						target: LOG_TARGET,
						core_idx = %core_idx,
						relay_parent = ?relay_parent,
						"core is occupied. Keep going.",
					);
220
221
					continue;
				}
222
223
224
225
226
227
228
229
				CoreState::Free => {
					tracing::trace!(
						target: LOG_TARGET,
						core_idx = %core_idx,
						"core is free. Keep going.",
					);
					continue
				}
230
231
232
			};

			if scheduled_core.para_id != config.para_id {
233
234
235
236
237
238
239
240
				tracing::trace!(
					target: LOG_TARGET,
					core_idx = %core_idx,
					relay_parent = ?relay_parent,
					our_para = %config.para_id,
					their_para = %scheduled_core.para_id,
					"core is not assigned to our para. Keep going.",
				);
241
242
243
				continue;
			}

244
245
246
247
			// we get validation data synchronously for each core instead of
			// within the subtask loop, because we have only a single mutable handle to the
			// context, so the work can't really be distributed
			let validation_data = match request_full_validation_data_ctx(
248
249
250
251
252
253
254
255
				relay_parent,
				scheduled_core.para_id,
				assumption,
				ctx,
			)
			.await?
			.await??
			{
256
				Some(v) => v,
257
258
259
260
261
262
263
264
265
266
267
				None => {
					tracing::trace!(
						target: LOG_TARGET,
						core_idx = %core_idx,
						relay_parent = ?relay_parent,
						our_para = %config.para_id,
						their_para = %scheduled_core.para_id,
						"validation data is not available",
					);
					continue
				}
268
269
270
271
			};

			let task_config = config.clone();
			let mut task_sender = sender.clone();
272
			let metrics = metrics.clone();
273
			ctx.spawn("collation generation collation builder", Box::pin(async move {
274
				let persisted_validation_data_hash = validation_data.persisted.hash();
275

276
				let collation = match (task_config.collator)(relay_parent, &validation_data).await {
277
278
					Some(collation) => collation,
					None => {
279
						tracing::debug!(
280
							target: LOG_TARGET,
281
282
							para_id = %scheduled_core.para_id,
							"collator returned no collation on collate",
283
284
285
286
						);
						return
					}
				};
287
288
289
290
291
292

				let pov_hash = collation.proof_of_validity.hash();

				let signature_payload = collator_signature_payload(
					&relay_parent,
					&scheduled_core.para_id,
293
					&persisted_validation_data_hash,
294
295
296
					&pov_hash,
				);

297
298
299
300
301
				let erasure_root = match erasure_root(
					n_validators,
					validation_data.persisted,
					collation.proof_of_validity.clone(),
				) {
302
303
					Ok(erasure_root) => erasure_root,
					Err(err) => {
304
						tracing::error!(
305
							target: LOG_TARGET,
306
307
308
							para_id = %scheduled_core.para_id,
							err = ?err,
							"failed to calculate erasure root",
309
						);
310
311
312
313
314
315
						return
					}
				};

				let commitments = CandidateCommitments {
					upward_messages: collation.upward_messages,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
316
					horizontal_messages: collation.horizontal_messages,
317
318
					new_validation_code: collation.new_validation_code,
					head_data: collation.head_data,
319
					processed_downward_messages: collation.processed_downward_messages,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
320
					hrmp_watermark: collation.hrmp_watermark,
321
322
323
324
325
326
327
328
329
				};

				let ccr = CandidateReceipt {
					commitments_hash: commitments.hash(),
					descriptor: CandidateDescriptor {
						signature: task_config.key.sign(&signature_payload),
						para_id: scheduled_core.para_id,
						relay_parent,
						collator: task_config.key.public(),
330
						persisted_validation_data_hash,
331
						pov_hash,
332
						erasure_root,
333
334
335
					},
				};

336
337
				metrics.on_collation_generated();

338
339
340
				if let Err(err) = task_sender.send(AllMessages::CollatorProtocol(
					CollatorProtocolMessage::DistributeCollation(ccr, collation.proof_of_validity)
				)).await {
341
					tracing::warn!(
342
						target: LOG_TARGET,
343
344
345
						para_id = %scheduled_core.para_id,
						err = ?err,
						"failed to send collation result",
346
					);
347
348
349
350
351
352
353
354
				}
			})).await?;
		}
	}

	Ok(())
}

355
#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
356
357
fn erasure_root(
	n_validators: usize,
358
	persisted_validation: PersistedValidationData,
359
	pov: PoV,
360
) -> crate::error::Result<Hash> {
361
	let available_data = AvailableData {
362
		validation_data: persisted_validation,
363
		pov: Arc::new(pov),
364
365
366
367
368
369
	};

	let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
	Ok(polkadot_erasure_coding::branches(&chunks).root())
}

370
371
372
#[derive(Clone)]
struct MetricsInner {
	collations_generated_total: prometheus::Counter<prometheus::U64>,
373
374
375
	new_activations_overall: prometheus::Histogram,
	new_activations_per_relay_parent: prometheus::Histogram,
	new_activations_per_availability_core: prometheus::Histogram,
376
377
378
379
380
381
382
383
384
385
386
387
}

/// CollationGenerationSubsystem metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_collation_generated(&self) {
		if let Some(metrics) = &self.0 {
			metrics.collations_generated_total.inc();
		}
	}
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402

	/// Provide a timer for new activations which updates on drop.
	fn time_new_activations(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.new_activations_overall.start_timer())
	}

	/// Provide a timer per relay parents which updates on drop.
	fn time_new_activations_relay_parent(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.new_activations_per_relay_parent.start_timer())
	}

	/// Provide a timer per availability core which updates on drop.
	fn time_new_activations_availability_core(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.new_activations_per_availability_core.start_timer())
	}
403
404
405
}

impl metrics::Metrics for Metrics {
406
	fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
407
408
409
410
411
412
413
414
		let metrics = MetricsInner {
			collations_generated_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_collations_generated_total",
					"Number of collations generated."
				)?,
				registry,
			)?,
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
			new_activations_overall: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_collation_generation_new_activations",
						"Time spent within fn handle_new_activations",
					)
				)?,
				registry,
			)?,
			new_activations_per_relay_parent: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_collation_generation_per_relay_parent",
						"Time spent handling a particular relay parent within fn handle_new_activations"
					)
				)?,
				registry,
			)?,
			new_activations_per_availability_core: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_collation_generation_per_availability_core",
						"Time spent handling a particular availability core for a relay parent in fn handle_new_activations",
					)
				)?,
				registry,
			)?,
442
443
444
445
446
		};
		Ok(Metrics(Some(metrics)))
	}
}

447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
#[cfg(test)]
mod tests {
	mod handle_new_activations {
		use super::super::*;
		use futures::{
			lock::Mutex,
			task::{Context as FuturesContext, Poll},
			Future,
		};
		use polkadot_node_primitives::Collation;
		use polkadot_node_subsystem::messages::{
			AllMessages, RuntimeApiMessage, RuntimeApiRequest,
		};
		use polkadot_node_subsystem_test_helpers::{
			subsystem_test_harness, TestSubsystemContextHandle,
		};
		use polkadot_primitives::v1::{
464
465
			BlockData, BlockNumber, CollatorPair, Id as ParaId,
			PersistedValidationData, PoV, ScheduledCore, ValidationData,
466
467
468
469
470
471
		};
		use std::pin::Pin;

		fn test_collation() -> Collation {
			Collation {
				upward_messages: Default::default(),
Sergey Pepyakin's avatar
Sergey Pepyakin committed
472
				horizontal_messages: Default::default(),
473
474
475
476
477
				new_validation_code: Default::default(),
				head_data: Default::default(),
				proof_of_validity: PoV {
					block_data: BlockData(Vec::new()),
				},
478
				processed_downward_messages: Default::default(),
Sergey Pepyakin's avatar
Sergey Pepyakin committed
479
				hrmp_watermark: Default::default(),
480
481
482
483
484
485
486
			}
		}

		// Box<dyn Future<Output = Collation> + Unpin + Send
		struct TestCollator;

		impl Future for TestCollator {
487
			type Output = Option<Collation>;
488
489

			fn poll(self: Pin<&mut Self>, _cx: &mut FuturesContext) -> Poll<Self::Output> {
490
				Poll::Ready(Some(test_collation()))
491
492
493
494
495
496
497
498
			}
		}

		impl Unpin for TestCollator {}

		fn test_config<Id: Into<ParaId>>(para_id: Id) -> Arc<CollationGenerationConfig> {
			Arc::new(CollationGenerationConfig {
				key: CollatorPair::generate().0,
499
				collator: Box::new(|_: Hash, _vd: &ValidationData| {
500
					TestCollator.boxed()
501
502
503
504
505
506
507
508
509
510
511
512
513
				}),
				para_id: para_id.into(),
			})
		}

		fn scheduled_core_for<Id: Into<ParaId>>(para_id: Id) -> ScheduledCore {
			ScheduledCore {
				para_id: para_id.into(),
				collator: None,
			}
		}

		#[test]
514
		fn requests_availability_per_relay_parent() {
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
			let activated_hashes: Vec<Hash> = vec![
				[1; 32].into(),
				[4; 32].into(),
				[9; 32].into(),
				[16; 32].into(),
			];

			let requested_availability_cores = Arc::new(Mutex::new(Vec::new()));

			let overseer_requested_availability_cores = requested_availability_cores.clone();
			let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
				loop {
					match handle.try_recv().await {
						None => break,
						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::AvailabilityCores(tx)))) => {
							overseer_requested_availability_cores.lock().await.push(hash);
							tx.send(Ok(vec![])).unwrap();
						}
						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(_hash, RuntimeApiRequest::Validators(tx)))) => {
							tx.send(Ok(vec![Default::default(); 3])).unwrap();
						}
						Some(msg) => panic!("didn't expect any other overseer requests given no availability cores; got {:?}", msg),
					}
				}
			};

			let (tx, _rx) = mpsc::channel(0);

			let subsystem_activated_hashes = activated_hashes.clone();
			subsystem_test_harness(overseer, |mut ctx| async move {
				handle_new_activations(
546
					test_config(123u32),
547
548
					&subsystem_activated_hashes,
					&mut ctx,
549
					Metrics(None),
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
					&tx,
				)
				.await
				.unwrap();
			});

			let mut requested_availability_cores = Arc::try_unwrap(requested_availability_cores)
				.expect("overseer should have shut down by now")
				.into_inner();
			requested_availability_cores.sort();

			assert_eq!(requested_availability_cores, activated_hashes);
		}

		#[test]
565
		fn requests_validation_data_for_scheduled_matches() {
566
567
568
569
570
571
572
			let activated_hashes: Vec<Hash> = vec![
				Hash::repeat_byte(1),
				Hash::repeat_byte(4),
				Hash::repeat_byte(9),
				Hash::repeat_byte(16),
			];

573
			let requested_full_validation_data = Arc::new(Mutex::new(Vec::new()));
574

575
			let overseer_requested_full_validation_data = requested_full_validation_data.clone();
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
			let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
				loop {
					match handle.try_recv().await {
						None => break,
						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
							hash,
							RuntimeApiRequest::AvailabilityCores(tx),
						))) => {
							tx.send(Ok(vec![
								CoreState::Free,
								// this is weird, see explanation below
								CoreState::Scheduled(scheduled_core_for(
									(hash.as_fixed_bytes()[0] * 4) as u32,
								)),
								CoreState::Scheduled(scheduled_core_for(
									(hash.as_fixed_bytes()[0] * 5) as u32,
								)),
							]))
							.unwrap();
						}
						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
							hash,
598
							RuntimeApiRequest::FullValidationData(
599
600
601
602
603
								_para_id,
								_occupied_core_assumption,
								tx,
							),
						))) => {
604
							overseer_requested_full_validation_data
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
								.lock()
								.await
								.push(hash);
							tx.send(Ok(Default::default())).unwrap();
						}
						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
							_hash,
							RuntimeApiRequest::Validators(tx),
						))) => {
							tx.send(Ok(vec![Default::default(); 3])).unwrap();
						}
						Some(msg) => {
							panic!("didn't expect any other overseer requests; got {:?}", msg)
						}
					}
				}
			};

			let (tx, _rx) = mpsc::channel(0);

			subsystem_test_harness(overseer, |mut ctx| async move {
626
				handle_new_activations(test_config(16), &activated_hashes, &mut ctx, Metrics(None), &tx)
627
628
629
630
					.await
					.unwrap();
			});

631
			let requested_full_validation_data = Arc::try_unwrap(requested_full_validation_data)
632
633
634
635
636
637
638
				.expect("overseer should have shut down by now")
				.into_inner();

			// the only activated hash should be from the 4 hash:
			// each activated hash generates two scheduled cores: one with its value * 4, one with its value * 5
			// given that the test configuration has a para_id of 16, there's only one way to get that value: with the 4
			// hash.
639
			assert_eq!(requested_full_validation_data, vec![[4; 32].into()]);
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
		}

		#[test]
		fn sends_distribute_collation_message() {
			let activated_hashes: Vec<Hash> = vec![
				Hash::repeat_byte(1),
				Hash::repeat_byte(4),
				Hash::repeat_byte(9),
				Hash::repeat_byte(16),
			];

			let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
				loop {
					match handle.try_recv().await {
						None => break,
						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
							hash,
							RuntimeApiRequest::AvailabilityCores(tx),
						))) => {
							tx.send(Ok(vec![
								CoreState::Free,
								// this is weird, see explanation below
								CoreState::Scheduled(scheduled_core_for(
									(hash.as_fixed_bytes()[0] * 4) as u32,
								)),
								CoreState::Scheduled(scheduled_core_for(
									(hash.as_fixed_bytes()[0] * 5) as u32,
								)),
							]))
							.unwrap();
						}
						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
							_hash,
673
							RuntimeApiRequest::FullValidationData(
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
								_para_id,
								_occupied_core_assumption,
								tx,
							),
						))) => {
							tx.send(Ok(Some(Default::default()))).unwrap();
						}
						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
							_hash,
							RuntimeApiRequest::Validators(tx),
						))) => {
							tx.send(Ok(vec![Default::default(); 3])).unwrap();
						}
						Some(msg) => {
							panic!("didn't expect any other overseer requests; got {:?}", msg)
						}
					}
				}
			};

			let config = test_config(16);
			let subsystem_config = config.clone();

			let (tx, rx) = mpsc::channel(0);

			// empty vec doesn't allocate on the heap, so it's ok we throw it away
			let sent_messages = Arc::new(Mutex::new(Vec::new()));
			let subsystem_sent_messages = sent_messages.clone();
			subsystem_test_harness(overseer, |mut ctx| async move {
703
				handle_new_activations(subsystem_config, &activated_hashes, &mut ctx, Metrics(None), &tx)
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
					.await
					.unwrap();

				std::mem::drop(tx);

				// collect all sent messages
				*subsystem_sent_messages.lock().await = rx.collect().await;
			});

			let sent_messages = Arc::try_unwrap(sent_messages)
				.expect("subsystem should have shut down by now")
				.into_inner();

			// we expect a single message to be sent, containing a candidate receipt.
			// we don't care too much about the commitments_hash right now, but let's ensure that we've calculated the
			// correct descriptor
			let expect_pov_hash = test_collation().proof_of_validity.hash();
721
722
			let expect_validation_data_hash
				= PersistedValidationData::<BlockNumber>::default().hash();
723
724
725
726
727
728
729
730
731
732
733
734
			let expect_relay_parent = Hash::repeat_byte(4);
			let expect_payload = collator_signature_payload(
				&expect_relay_parent,
				&config.para_id,
				&expect_validation_data_hash,
				&expect_pov_hash,
			);
			let expect_descriptor = CandidateDescriptor {
				signature: config.key.sign(&expect_payload),
				para_id: config.para_id,
				relay_parent: expect_relay_parent,
				collator: config.key.public(),
735
				persisted_validation_data_hash: expect_validation_data_hash,
736
				pov_hash: expect_pov_hash,
737
				erasure_root: Default::default(), // this isn't something we're checking right now
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
			};

			assert_eq!(sent_messages.len(), 1);
			match &sent_messages[0] {
				AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
					CandidateReceipt { descriptor, .. },
					_pov,
				)) => {
					// signature generation is non-deterministic, so we can't just assert that the
					// expected descriptor is correct. What we can do is validate that the produced
					// descriptor has a valid signature, then just copy in the generated signature
					// and check the rest of the fields for equality.
					assert!(CollatorPair::verify(
						&descriptor.signature,
						&collator_signature_payload(
							&descriptor.relay_parent,
							&descriptor.para_id,
755
							&descriptor.persisted_validation_data_hash,
756
757
758
759
760
761
762
763
							&descriptor.pov_hash,
						)
						.as_ref(),
						&descriptor.collator,
					));
					let expect_descriptor = {
						let mut expect_descriptor = expect_descriptor;
						expect_descriptor.signature = descriptor.signature.clone();
764
						expect_descriptor.erasure_root = descriptor.erasure_root.clone();
765
766
767
768
769
770
771
772
773
						expect_descriptor
					};
					assert_eq!(descriptor, &expect_descriptor);
				}
				_ => panic!("received wrong message type"),
			}
		}
	}
}