lib.rs 24.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! The collation generation subsystem is the interface between polkadot and the collators.

#![deny(missing_docs)]

use futures::{
22
	channel::mpsc,
23
24
25
26
27
28
29
30
31
	future::FutureExt,
	join,
	select,
	sink::SinkExt,
	stream::StreamExt,
};
use polkadot_node_primitives::CollationGenerationConfig;
use polkadot_node_subsystem::{
	messages::{AllMessages, CollationGenerationMessage, CollatorProtocolMessage},
32
	FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
33
34
};
use polkadot_node_subsystem_util::{
35
	request_availability_cores_ctx, request_persisted_validation_data_ctx,
36
	request_validators_ctx,
37
	metrics::{self, prometheus},
38
39
};
use polkadot_primitives::v1::{
40
41
42
	collator_signature_payload, AvailableData, CandidateCommitments,
	CandidateDescriptor, CandidateReceipt, CoreState, Hash, OccupiedCoreAssumption,
	PersistedValidationData, PoV,
43
44
45
46
};
use sp_core::crypto::Pair;
use std::sync::Arc;

47
48
mod error;

49
const LOG_TARGET: &'static str = "parachain::collation-generation";
50

51
52
53
/// Collation Generation Subsystem
pub struct CollationGenerationSubsystem {
	config: Option<Arc<CollationGenerationConfig>>,
54
	metrics: Metrics,
55
56
57
}

impl CollationGenerationSubsystem {
58
59
60
61
62
63
64
65
	/// Create a new instance of the `CollationGenerationSubsystem`.
	pub fn new(metrics: Metrics) -> Self {
		Self {
			config: None,
			metrics,
		}
	}

66
67
68
69
70
71
72
73
74
75
76
	/// Run this subsystem
	///
	/// Conceptually, this is very simple: it just loops forever.
	///
	/// - On incoming overseer messages, it starts or stops jobs as appropriate.
	/// - On other incoming messages, if they can be converted into Job::ToJob and
	///   include a hash, then they're forwarded to the appropriate individual job.
	/// - On outgoing messages from the jobs, it forwards them to the overseer.
	///
	/// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur.
	/// Otherwise, most are logged and then discarded.
77
	#[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))]
78
79
80
81
82
83
84
85
86
	async fn run<Context>(mut self, mut ctx: Context)
	where
		Context: SubsystemContext<Message = CollationGenerationMessage>,
	{
		// when we activate new leaves, we spawn a bunch of sub-tasks, each of which is
		// expected to generate precisely one message. We don't want to block the main loop
		// at any point waiting for them all, so instead, we create a channel on which they can
		// send those messages. We can then just monitor the channel and forward messages on it
		// to the overseer here, via the context.
87
		let (sender, receiver) = mpsc::channel(0);
88

89
		let mut receiver = receiver.fuse();
90
91
92
93
94
95
96
		loop {
			select! {
				incoming = ctx.recv().fuse() => {
					if self.handle_incoming::<Context>(incoming, &mut ctx, &sender).await {
						break;
					}
				},
97
				msg = receiver.next() => {
98
					if let Some(msg) = msg {
99
						ctx.send_message(msg).await;
100
101
102
103
104
105
106
107
108
109
					}
				},
			}
		}
	}

	// handle an incoming message. return true if we should break afterwards.
	// note: this doesn't strictly need to be a separate function; it's more an administrative function
	// so that we don't clutter the run loop. It could in principle be inlined directly into there.
	// it should hopefully therefore be ok that it's an async function mutably borrowing self.
110
	#[tracing::instrument(level = "trace", skip(self, ctx, sender), fields(subsystem = LOG_TARGET))]
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
	async fn handle_incoming<Context>(
		&mut self,
		incoming: SubsystemResult<FromOverseer<Context::Message>>,
		ctx: &mut Context,
		sender: &mpsc::Sender<AllMessages>,
	) -> bool
	where
		Context: SubsystemContext<Message = CollationGenerationMessage>,
	{
		use polkadot_node_subsystem::ActiveLeavesUpdate;
		use polkadot_node_subsystem::FromOverseer::{Communication, Signal};
		use polkadot_node_subsystem::OverseerSignal::{ActiveLeaves, BlockFinalized, Conclude};

		match incoming {
			Ok(Signal(ActiveLeaves(ActiveLeavesUpdate { activated, .. }))) => {
				// follow the procedure from the guide
				if let Some(config) = &self.config {
128
					let metrics = self.metrics.clone();
129
130
131
132
133
134
135
					if let Err(err) = handle_new_activations(
						config.clone(),
						activated.into_iter().map(|v| v.0),
						ctx,
						metrics,
						sender,
					).await {
136
						tracing::warn!(target: LOG_TARGET, err = ?err, "failed to handle new activations");
137
					}
138
				}
139

140
141
142
143
144
145
146
				false
			}
			Ok(Signal(Conclude)) => true,
			Ok(Communication {
				msg: CollationGenerationMessage::Initialize(config),
			}) => {
				if self.config.is_some() {
147
					tracing::error!(target: LOG_TARGET, "double initialization");
148
149
150
				} else {
					self.config = Some(Arc::new(config));
				}
151
				false
152
			}
153
			Ok(Signal(BlockFinalized(..))) => false,
154
			Err(err) => {
155
				tracing::error!(
156
					target: LOG_TARGET,
157
					err = ?err,
158
159
160
					"error receiving message from subsystem context: {:?}",
					err
				);
161
162
163
164
165
166
167
168
169
170
				true
			}
		}
	}
}

impl<Context> Subsystem<Context> for CollationGenerationSubsystem
where
	Context: SubsystemContext<Message = CollationGenerationMessage>,
{
171
	fn start(self, ctx: Context) -> SpawnedSubsystem {
172
		let future = async move {
173
174
			self.run(ctx).await;
			Ok(())
175
		}.boxed();
176
177

		SpawnedSubsystem {
178
			name: "collation-generation-subsystem",
179
180
181
182
183
			future,
		}
	}
}

184
#[tracing::instrument(level = "trace", skip(ctx, metrics, sender, activated), fields(subsystem = LOG_TARGET))]
185
186
async fn handle_new_activations<Context: SubsystemContext>(
	config: Arc<CollationGenerationConfig>,
187
	activated: impl IntoIterator<Item = Hash>,
188
	ctx: &mut Context,
189
	metrics: Metrics,
190
	sender: &mpsc::Sender<AllMessages>,
191
) -> crate::error::Result<()> {
192
193
194
	// follow the procedure from the guide:
	// https://w3f.github.io/parachain-implementers-guide/node/collators/collation-generation.html

195
196
	let _overall_timer = metrics.time_new_activations();

197
	for relay_parent in activated {
198
199
		let _relay_parent_timer = metrics.time_new_activations_relay_parent();

200
		let (availability_cores, validators) = join!(
201
202
203
204
205
206
207
			request_availability_cores_ctx(relay_parent, ctx).await?,
			request_validators_ctx(relay_parent, ctx).await?,
		);

		let availability_cores = availability_cores??;
		let n_validators = validators??.len();

208
		for (core_idx, core) in availability_cores.into_iter().enumerate() {
209
210
			let _availability_core_timer = metrics.time_new_activations_availability_core();

211
212
213
214
215
216
			let (scheduled_core, assumption) = match core {
				CoreState::Scheduled(scheduled_core) => {
					(scheduled_core, OccupiedCoreAssumption::Free)
				}
				CoreState::Occupied(_occupied_core) => {
					// TODO: https://github.com/paritytech/polkadot/issues/1573
217
218
219
220
221
222
					tracing::trace!(
						target: LOG_TARGET,
						core_idx = %core_idx,
						relay_parent = ?relay_parent,
						"core is occupied. Keep going.",
					);
223
224
					continue;
				}
225
226
227
228
229
230
231
232
				CoreState::Free => {
					tracing::trace!(
						target: LOG_TARGET,
						core_idx = %core_idx,
						"core is free. Keep going.",
					);
					continue
				}
233
234
235
			};

			if scheduled_core.para_id != config.para_id {
236
237
238
239
240
241
242
243
				tracing::trace!(
					target: LOG_TARGET,
					core_idx = %core_idx,
					relay_parent = ?relay_parent,
					our_para = %config.para_id,
					their_para = %scheduled_core.para_id,
					"core is not assigned to our para. Keep going.",
				);
244
245
246
				continue;
			}

247
248
249
			// we get validation data synchronously for each core instead of
			// within the subtask loop, because we have only a single mutable handle to the
			// context, so the work can't really be distributed
250
			let validation_data = match request_persisted_validation_data_ctx(
251
252
253
254
255
256
257
258
				relay_parent,
				scheduled_core.para_id,
				assumption,
				ctx,
			)
			.await?
			.await??
			{
259
				Some(v) => v,
260
261
262
263
264
265
266
267
268
269
270
				None => {
					tracing::trace!(
						target: LOG_TARGET,
						core_idx = %core_idx,
						relay_parent = ?relay_parent,
						our_para = %config.para_id,
						their_para = %scheduled_core.para_id,
						"validation data is not available",
					);
					continue
				}
271
272
273
274
			};

			let task_config = config.clone();
			let mut task_sender = sender.clone();
275
			let metrics = metrics.clone();
276
			ctx.spawn("collation generation collation builder", Box::pin(async move {
277
				let persisted_validation_data_hash = validation_data.hash();
278

279
280
				let (collation, result_sender) = match (task_config.collator)(relay_parent, &validation_data).await {
					Some(collation) => collation.into_inner(),
281
					None => {
282
						tracing::debug!(
283
							target: LOG_TARGET,
284
285
							para_id = %scheduled_core.para_id,
							"collator returned no collation on collate",
286
287
288
289
						);
						return
					}
				};
290
291
292
293
294
295

				let pov_hash = collation.proof_of_validity.hash();

				let signature_payload = collator_signature_payload(
					&relay_parent,
					&scheduled_core.para_id,
296
					&persisted_validation_data_hash,
297
298
299
					&pov_hash,
				);

300
301
				let erasure_root = match erasure_root(
					n_validators,
302
					validation_data,
303
304
					collation.proof_of_validity.clone(),
				) {
305
306
					Ok(erasure_root) => erasure_root,
					Err(err) => {
307
						tracing::error!(
308
							target: LOG_TARGET,
309
310
311
							para_id = %scheduled_core.para_id,
							err = ?err,
							"failed to calculate erasure root",
312
						);
313
314
315
316
317
318
						return
					}
				};

				let commitments = CandidateCommitments {
					upward_messages: collation.upward_messages,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
319
					horizontal_messages: collation.horizontal_messages,
320
321
					new_validation_code: collation.new_validation_code,
					head_data: collation.head_data,
322
					processed_downward_messages: collation.processed_downward_messages,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
323
					hrmp_watermark: collation.hrmp_watermark,
324
325
326
327
328
329
330
331
332
				};

				let ccr = CandidateReceipt {
					commitments_hash: commitments.hash(),
					descriptor: CandidateDescriptor {
						signature: task_config.key.sign(&signature_payload),
						para_id: scheduled_core.para_id,
						relay_parent,
						collator: task_config.key.public(),
333
						persisted_validation_data_hash,
334
						pov_hash,
335
						erasure_root,
336
						para_head: commitments.head_data.hash(),
337
338
339
					},
				};

340
341
342
343
344
345
346
347
				tracing::debug!(
					target: LOG_TARGET,
					candidate_hash = %ccr.hash(),
					?pov_hash,
					?relay_parent,
					para_id = %scheduled_core.para_id,
					"candidate is generated",
				);
348
349
				metrics.on_collation_generated();

350
				if let Err(err) = task_sender.send(AllMessages::CollatorProtocol(
351
					CollatorProtocolMessage::DistributeCollation(ccr, collation.proof_of_validity, result_sender)
352
				)).await {
353
					tracing::warn!(
354
						target: LOG_TARGET,
355
356
357
						para_id = %scheduled_core.para_id,
						err = ?err,
						"failed to send collation result",
358
					);
359
360
361
362
363
364
365
366
				}
			})).await?;
		}
	}

	Ok(())
}

367
#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
368
369
fn erasure_root(
	n_validators: usize,
370
	persisted_validation: PersistedValidationData,
371
	pov: PoV,
372
) -> crate::error::Result<Hash> {
373
	let available_data = AvailableData {
374
		validation_data: persisted_validation,
375
		pov: Arc::new(pov),
376
377
378
379
380
381
	};

	let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
	Ok(polkadot_erasure_coding::branches(&chunks).root())
}

382
383
384
#[derive(Clone)]
struct MetricsInner {
	collations_generated_total: prometheus::Counter<prometheus::U64>,
385
386
387
	new_activations_overall: prometheus::Histogram,
	new_activations_per_relay_parent: prometheus::Histogram,
	new_activations_per_availability_core: prometheus::Histogram,
388
389
390
391
392
393
394
395
396
397
398
399
}

/// CollationGenerationSubsystem metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_collation_generated(&self) {
		if let Some(metrics) = &self.0 {
			metrics.collations_generated_total.inc();
		}
	}
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414

	/// Provide a timer for new activations which updates on drop.
	fn time_new_activations(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.new_activations_overall.start_timer())
	}

	/// Provide a timer per relay parents which updates on drop.
	fn time_new_activations_relay_parent(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.new_activations_per_relay_parent.start_timer())
	}

	/// Provide a timer per availability core which updates on drop.
	fn time_new_activations_availability_core(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.new_activations_per_availability_core.start_timer())
	}
415
416
417
}

impl metrics::Metrics for Metrics {
418
	fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
419
420
421
422
423
424
425
426
		let metrics = MetricsInner {
			collations_generated_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_collations_generated_total",
					"Number of collations generated."
				)?,
				registry,
			)?,
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
			new_activations_overall: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_collation_generation_new_activations",
						"Time spent within fn handle_new_activations",
					)
				)?,
				registry,
			)?,
			new_activations_per_relay_parent: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_collation_generation_per_relay_parent",
						"Time spent handling a particular relay parent within fn handle_new_activations"
					)
				)?,
				registry,
			)?,
			new_activations_per_availability_core: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_collation_generation_per_availability_core",
						"Time spent handling a particular availability core for a relay parent in fn handle_new_activations",
					)
				)?,
				registry,
			)?,
454
455
456
457
458
		};
		Ok(Metrics(Some(metrics)))
	}
}

459
460
461
462
463
464
465
466
467
#[cfg(test)]
mod tests {
	mod handle_new_activations {
		use super::super::*;
		use futures::{
			lock::Mutex,
			task::{Context as FuturesContext, Poll},
			Future,
		};
468
		use polkadot_node_primitives::{Collation, CollationResult};
469
470
471
472
473
474
475
		use polkadot_node_subsystem::messages::{
			AllMessages, RuntimeApiMessage, RuntimeApiRequest,
		};
		use polkadot_node_subsystem_test_helpers::{
			subsystem_test_harness, TestSubsystemContextHandle,
		};
		use polkadot_primitives::v1::{
476
			BlockData, BlockNumber, CollatorPair, Id as ParaId,
477
			PersistedValidationData, PoV, ScheduledCore,
478
479
480
481
482
483
		};
		use std::pin::Pin;

		fn test_collation() -> Collation {
			Collation {
				upward_messages: Default::default(),
Sergey Pepyakin's avatar
Sergey Pepyakin committed
484
				horizontal_messages: Default::default(),
485
486
487
488
489
				new_validation_code: Default::default(),
				head_data: Default::default(),
				proof_of_validity: PoV {
					block_data: BlockData(Vec::new()),
				},
490
				processed_downward_messages: Default::default(),
Sergey Pepyakin's avatar
Sergey Pepyakin committed
491
				hrmp_watermark: Default::default(),
492
493
494
495
496
497
498
			}
		}

		// Box<dyn Future<Output = Collation> + Unpin + Send
		struct TestCollator;

		impl Future for TestCollator {
499
			type Output = Option<CollationResult>;
500
501

			fn poll(self: Pin<&mut Self>, _cx: &mut FuturesContext) -> Poll<Self::Output> {
502
				Poll::Ready(Some(CollationResult { collation: test_collation(), result_sender: None }))
503
504
505
506
507
508
509
510
			}
		}

		impl Unpin for TestCollator {}

		fn test_config<Id: Into<ParaId>>(para_id: Id) -> Arc<CollationGenerationConfig> {
			Arc::new(CollationGenerationConfig {
				key: CollatorPair::generate().0,
511
				collator: Box::new(|_: Hash, _vd: &PersistedValidationData| {
512
					TestCollator.boxed()
513
514
515
516
517
518
519
520
521
522
523
524
525
				}),
				para_id: para_id.into(),
			})
		}

		fn scheduled_core_for<Id: Into<ParaId>>(para_id: Id) -> ScheduledCore {
			ScheduledCore {
				para_id: para_id.into(),
				collator: None,
			}
		}

		#[test]
526
		fn requests_availability_per_relay_parent() {
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
			let activated_hashes: Vec<Hash> = vec![
				[1; 32].into(),
				[4; 32].into(),
				[9; 32].into(),
				[16; 32].into(),
			];

			let requested_availability_cores = Arc::new(Mutex::new(Vec::new()));

			let overseer_requested_availability_cores = requested_availability_cores.clone();
			let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
				loop {
					match handle.try_recv().await {
						None => break,
						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::AvailabilityCores(tx)))) => {
							overseer_requested_availability_cores.lock().await.push(hash);
							tx.send(Ok(vec![])).unwrap();
						}
						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(_hash, RuntimeApiRequest::Validators(tx)))) => {
							tx.send(Ok(vec![Default::default(); 3])).unwrap();
						}
						Some(msg) => panic!("didn't expect any other overseer requests given no availability cores; got {:?}", msg),
					}
				}
			};

			let (tx, _rx) = mpsc::channel(0);

			let subsystem_activated_hashes = activated_hashes.clone();
			subsystem_test_harness(overseer, |mut ctx| async move {
				handle_new_activations(
558
					test_config(123u32),
559
					subsystem_activated_hashes,
560
					&mut ctx,
561
					Metrics(None),
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
					&tx,
				)
				.await
				.unwrap();
			});

			let mut requested_availability_cores = Arc::try_unwrap(requested_availability_cores)
				.expect("overseer should have shut down by now")
				.into_inner();
			requested_availability_cores.sort();

			assert_eq!(requested_availability_cores, activated_hashes);
		}

		#[test]
577
		fn requests_validation_data_for_scheduled_matches() {
578
579
580
581
582
583
584
			let activated_hashes: Vec<Hash> = vec![
				Hash::repeat_byte(1),
				Hash::repeat_byte(4),
				Hash::repeat_byte(9),
				Hash::repeat_byte(16),
			];

585
			let requested_validation_data = Arc::new(Mutex::new(Vec::new()));
586

587
			let overseer_requested_validation_data = requested_validation_data.clone();
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
			let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
				loop {
					match handle.try_recv().await {
						None => break,
						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
							hash,
							RuntimeApiRequest::AvailabilityCores(tx),
						))) => {
							tx.send(Ok(vec![
								CoreState::Free,
								// this is weird, see explanation below
								CoreState::Scheduled(scheduled_core_for(
									(hash.as_fixed_bytes()[0] * 4) as u32,
								)),
								CoreState::Scheduled(scheduled_core_for(
									(hash.as_fixed_bytes()[0] * 5) as u32,
								)),
							]))
							.unwrap();
						}
						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
							hash,
610
							RuntimeApiRequest::PersistedValidationData(
611
612
613
614
615
								_para_id,
								_occupied_core_assumption,
								tx,
							),
						))) => {
616
							overseer_requested_validation_data
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
								.lock()
								.await
								.push(hash);
							tx.send(Ok(Default::default())).unwrap();
						}
						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
							_hash,
							RuntimeApiRequest::Validators(tx),
						))) => {
							tx.send(Ok(vec![Default::default(); 3])).unwrap();
						}
						Some(msg) => {
							panic!("didn't expect any other overseer requests; got {:?}", msg)
						}
					}
				}
			};

			let (tx, _rx) = mpsc::channel(0);

			subsystem_test_harness(overseer, |mut ctx| async move {
638
				handle_new_activations(test_config(16), activated_hashes, &mut ctx, Metrics(None), &tx)
639
640
641
642
					.await
					.unwrap();
			});

643
			let requested_validation_data = Arc::try_unwrap(requested_validation_data)
644
645
646
647
648
649
650
				.expect("overseer should have shut down by now")
				.into_inner();

			// the only activated hash should be from the 4 hash:
			// each activated hash generates two scheduled cores: one with its value * 4, one with its value * 5
			// given that the test configuration has a para_id of 16, there's only one way to get that value: with the 4
			// hash.
651
			assert_eq!(requested_validation_data, vec![[4; 32].into()]);
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
		}

		#[test]
		fn sends_distribute_collation_message() {
			let activated_hashes: Vec<Hash> = vec![
				Hash::repeat_byte(1),
				Hash::repeat_byte(4),
				Hash::repeat_byte(9),
				Hash::repeat_byte(16),
			];

			let overseer = |mut handle: TestSubsystemContextHandle<CollationGenerationMessage>| async move {
				loop {
					match handle.try_recv().await {
						None => break,
						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
							hash,
							RuntimeApiRequest::AvailabilityCores(tx),
						))) => {
							tx.send(Ok(vec![
								CoreState::Free,
								// this is weird, see explanation below
								CoreState::Scheduled(scheduled_core_for(
									(hash.as_fixed_bytes()[0] * 4) as u32,
								)),
								CoreState::Scheduled(scheduled_core_for(
									(hash.as_fixed_bytes()[0] * 5) as u32,
								)),
							]))
							.unwrap();
						}
						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
							_hash,
685
							RuntimeApiRequest::PersistedValidationData(
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
								_para_id,
								_occupied_core_assumption,
								tx,
							),
						))) => {
							tx.send(Ok(Some(Default::default()))).unwrap();
						}
						Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
							_hash,
							RuntimeApiRequest::Validators(tx),
						))) => {
							tx.send(Ok(vec![Default::default(); 3])).unwrap();
						}
						Some(msg) => {
							panic!("didn't expect any other overseer requests; got {:?}", msg)
						}
					}
				}
			};

			let config = test_config(16);
			let subsystem_config = config.clone();

			let (tx, rx) = mpsc::channel(0);

			// empty vec doesn't allocate on the heap, so it's ok we throw it away
			let sent_messages = Arc::new(Mutex::new(Vec::new()));
			let subsystem_sent_messages = sent_messages.clone();
			subsystem_test_harness(overseer, |mut ctx| async move {
715
				handle_new_activations(subsystem_config, activated_hashes, &mut ctx, Metrics(None), &tx)
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
					.await
					.unwrap();

				std::mem::drop(tx);

				// collect all sent messages
				*subsystem_sent_messages.lock().await = rx.collect().await;
			});

			let sent_messages = Arc::try_unwrap(sent_messages)
				.expect("subsystem should have shut down by now")
				.into_inner();

			// we expect a single message to be sent, containing a candidate receipt.
			// we don't care too much about the commitments_hash right now, but let's ensure that we've calculated the
			// correct descriptor
			let expect_pov_hash = test_collation().proof_of_validity.hash();
733
734
			let expect_validation_data_hash
				= PersistedValidationData::<BlockNumber>::default().hash();
735
736
737
738
739
740
741
742
743
744
745
746
			let expect_relay_parent = Hash::repeat_byte(4);
			let expect_payload = collator_signature_payload(
				&expect_relay_parent,
				&config.para_id,
				&expect_validation_data_hash,
				&expect_pov_hash,
			);
			let expect_descriptor = CandidateDescriptor {
				signature: config.key.sign(&expect_payload),
				para_id: config.para_id,
				relay_parent: expect_relay_parent,
				collator: config.key.public(),
747
				persisted_validation_data_hash: expect_validation_data_hash,
748
				pov_hash: expect_pov_hash,
749
				erasure_root: Default::default(), // this isn't something we're checking right now
750
				para_head: test_collation().head_data.hash(),
751
752
753
754
755
756
757
			};

			assert_eq!(sent_messages.len(), 1);
			match &sent_messages[0] {
				AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
					CandidateReceipt { descriptor, .. },
					_pov,
758
					..
759
760
761
762
763
764
765
766
767
768
				)) => {
					// signature generation is non-deterministic, so we can't just assert that the
					// expected descriptor is correct. What we can do is validate that the produced
					// descriptor has a valid signature, then just copy in the generated signature
					// and check the rest of the fields for equality.
					assert!(CollatorPair::verify(
						&descriptor.signature,
						&collator_signature_payload(
							&descriptor.relay_parent,
							&descriptor.para_id,
769
							&descriptor.persisted_validation_data_hash,
770
771
772
773
774
775
776
777
							&descriptor.pov_hash,
						)
						.as_ref(),
						&descriptor.collator,
					));
					let expect_descriptor = {
						let mut expect_descriptor = expect_descriptor;
						expect_descriptor.signature = descriptor.signature.clone();
778
						expect_descriptor.erasure_root = descriptor.erasure_root.clone();
779
780
781
782
783
784
785
786
787
						expect_descriptor
					};
					assert_eq!(descriptor, &expect_descriptor);
				}
				_ => panic!("received wrong message type"),
			}
		}
	}
}