lib.rs 20.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Implements the Runtime API Subsystem
//!
//! This provides a clean, ownerless wrapper around the parachain-related runtime APIs. This crate
//! can also be used to cache responses from heavy runtime APIs.
21

22
23
24
#![deny(unused_crate_dependencies)]
#![warn(missing_docs)]

25
26
27
use polkadot_subsystem::{
	Subsystem, SpawnedSubsystem, SubsystemResult, SubsystemContext,
	FromOverseer, OverseerSignal,
28
29
30
31
	messages::{
		RuntimeApiMessage, RuntimeApiRequest as Request,
	},
	errors::RuntimeApiError,
32
};
33
34
use polkadot_node_subsystem_util::{
	metrics::{self, prometheus},
35
36
};
use polkadot_primitives::v1::{Block, BlockId, Hash, ParachainHost};
37
use std::sync::Arc;
38
39
40
41
42
43

use sp_api::{ProvideRuntimeApi};

use futures::prelude::*;

/// The `RuntimeApiSubsystem`. See module docs for more details.
44
pub struct RuntimeApiSubsystem<Client> {
45
	client: Arc<Client>,
46
47
	metrics: Metrics,
}
48
49

impl<Client> RuntimeApiSubsystem<Client> {
50
	/// Create a new Runtime API subsystem wrapping the given client and metrics.
51
	pub fn new(client: Arc<Client>, metrics: Metrics) -> Self {
52
		RuntimeApiSubsystem { client, metrics }
53
54
55
56
	}
}

impl<Client, Context> Subsystem<Context> for RuntimeApiSubsystem<Client> where
57
	Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
58
59
60
61
62
	Client::Api: ParachainHost<Block>,
	Context: SubsystemContext<Message = RuntimeApiMessage>
{
	fn start(self, ctx: Context) -> SpawnedSubsystem {
		SpawnedSubsystem {
63
			future: run(ctx, self).boxed(),
Andronik Ordian's avatar
Andronik Ordian committed
64
			name: "runtime-api-subsystem",
65
66
67
68
69
70
		}
	}
}

async fn run<Client>(
	mut ctx: impl SubsystemContext<Message = RuntimeApiMessage>,
71
	subsystem: RuntimeApiSubsystem<Client>,
72
73
74
75
76
77
78
79
80
81
82
) -> SubsystemResult<()> where
	Client: ProvideRuntimeApi<Block>,
	Client::Api: ParachainHost<Block>,
{
	loop {
		match ctx.recv().await? {
			FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
			FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {},
			FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {},
			FromOverseer::Communication { msg } => match msg {
				RuntimeApiMessage::Request(relay_parent, request) => make_runtime_api_request(
83
					&*subsystem.client,
84
					&subsystem.metrics,
85
86
87
88
89
90
91
92
93
94
					relay_parent,
					request,
				),
			}
		}
	}
}

fn make_runtime_api_request<Client>(
	client: &Client,
95
	metrics: &Metrics,
96
97
98
99
100
101
102
103
104
105
106
107
	relay_parent: Hash,
	request: Request,
) where
	Client: ProvideRuntimeApi<Block>,
	Client::Api: ParachainHost<Block>,
{
	macro_rules! query {
		($api_name:ident ($($param:expr),*), $sender:expr) => {{
			let sender = $sender;
			let api = client.runtime_api();
			let res = api.$api_name(&BlockId::Hash(relay_parent), $($param),*)
				.map_err(|e| RuntimeApiError::from(format!("{:?}", e)));
108
			metrics.on_request(res.is_ok());
109
110
111
112
113
114
115
116
			let _ = sender.send(res);
		}}
	}

	match request {
		Request::Validators(sender) => query!(validators(), sender),
		Request::ValidatorGroups(sender) => query!(validator_groups(), sender),
		Request::AvailabilityCores(sender) => query!(availability_cores(), sender),
117
118
119
120
		Request::PersistedValidationData(para, assumption, sender) =>
			query!(persisted_validation_data(para, assumption), sender),
		Request::FullValidationData(para, assumption, sender) =>
			query!(full_validation_data(para, assumption), sender),
121
122
		Request::CheckValidationOutputs(para, commitments, sender) =>
			query!(check_validation_outputs(para, commitments), sender),
123
124
125
126
127
128
		Request::SessionIndexForChild(sender) => query!(session_index_for_child(), sender),
		Request::ValidationCode(para, assumption, sender) =>
			query!(validation_code(para, assumption), sender),
		Request::CandidatePendingAvailability(para, sender) =>
			query!(candidate_pending_availability(para), sender),
		Request::CandidateEvents(sender) => query!(candidate_events(), sender),
129
		Request::ValidatorDiscovery(ids, sender) => query!(validator_discovery(ids), sender),
130
		Request::DmqContents(id, sender) => query!(dmq_contents(id), sender),
131
132
133
	}
}

134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#[derive(Clone)]
struct MetricsInner {
	chain_api_requests: prometheus::CounterVec<prometheus::U64>,
}

/// Runtime API metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_request(&self, succeeded: bool) {
		if let Some(metrics) = &self.0 {
			if succeeded {
				metrics.chain_api_requests.with_label_values(&["succeeded"]).inc();
			} else {
				metrics.chain_api_requests.with_label_values(&["failed"]).inc();
			}
		}
	}
}

impl metrics::Metrics for Metrics {
	fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
		let metrics = MetricsInner {
			chain_api_requests: prometheus::register(
				prometheus::CounterVec::new(
					prometheus::Opts::new(
						"parachain_runtime_api_requests_total",
						"Number of Runtime API requests served.",
					),
164
					&["success"],
165
166
167
168
169
170
171
172
				)?,
				registry,
			)?,
		};
		Ok(Metrics(Some(metrics)))
	}
}

173
174
175
176
177
#[cfg(test)]
mod tests {
	use super::*;

	use polkadot_primitives::v1::{
178
179
		ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, PersistedValidationData,
		Id as ParaId, OccupiedCoreAssumption, ValidationData, SessionIndex, ValidationCode,
180
		CommittedCandidateReceipt, CandidateEvent, AuthorityDiscoveryId, InboundDownwardMessage,
181
	};
182
	use polkadot_node_subsystem_test_helpers as test_helpers;
183
184
185
186
187
188
189
190
191
192
	use sp_core::testing::TaskExecutor;

	use std::collections::HashMap;
	use futures::channel::oneshot;

	#[derive(Default, Clone)]
	struct MockRuntimeApi {
		validators: Vec<ValidatorId>,
		validator_groups: Vec<Vec<ValidatorIndex>>,
		availability_cores: Vec<CoreState>,
193
		validation_data: HashMap<ParaId, ValidationData>,
194
195
		session_index_for_child: SessionIndex,
		validation_code: HashMap<ParaId, ValidationCode>,
196
		validation_outputs_results: HashMap<ParaId, bool>,
197
198
		candidate_pending_availability: HashMap<ParaId, CommittedCandidateReceipt>,
		candidate_events: Vec<CandidateEvent>,
199
		dmq_contents: HashMap<ParaId, Vec<InboundDownwardMessage>>,
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
	}

	impl ProvideRuntimeApi<Block> for MockRuntimeApi {
		type Api = Self;

		fn runtime_api<'a>(&'a self) -> sp_api::ApiRef<'a, Self::Api> {
			self.clone().into()
		}
	}

	sp_api::mock_impl_runtime_apis! {
		impl ParachainHost<Block> for MockRuntimeApi {
			type Error = String;

			fn validators(&self) -> Vec<ValidatorId> {
				self.validators.clone()
			}

			fn validator_groups(&self) -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo) {
				(
					self.validator_groups.clone(),
					GroupRotationInfo {
						session_start_block: 1,
						group_rotation_frequency: 100,
						now: 10,
					},
				)
			}

			fn availability_cores(&self) -> Vec<CoreState> {
				self.availability_cores.clone()
			}

233
234
235
236
237
238
			fn persisted_validation_data(
				&self,
				para: ParaId,
				_assumption: OccupiedCoreAssumption,
			) -> Option<PersistedValidationData> {
				self.validation_data.get(&para).map(|l| l.persisted.clone())
239
240
			}

241
			fn full_validation_data(
242
243
244
				&self,
				para: ParaId,
				_assumption: OccupiedCoreAssumption,
245
246
			) -> Option<ValidationData> {
				self.validation_data.get(&para).map(|l| l.clone())
247
248
			}

249
250
251
252
253
254
255
256
257
258
259
260
261
			fn check_validation_outputs(
				&self,
				para_id: ParaId,
				_commitments: polkadot_primitives::v1::ValidationOutputs,
			) -> bool {
				self.validation_outputs_results
					.get(&para_id)
					.cloned()
					.expect(
						"`check_validation_outputs` called but the expected result hasn't been supplied"
					)
			}

262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
			fn session_index_for_child(&self) -> SessionIndex {
				self.session_index_for_child.clone()
			}

			fn validation_code(
				&self,
				para: ParaId,
				_assumption: OccupiedCoreAssumption,
			) -> Option<ValidationCode> {
				self.validation_code.get(&para).map(|c| c.clone())
			}

			fn candidate_pending_availability(
				&self,
				para: ParaId,
			) -> Option<CommittedCandidateReceipt> {
				self.candidate_pending_availability.get(&para).map(|c| c.clone())
			}

			fn candidate_events(&self) -> Vec<CandidateEvent> {
				self.candidate_events.clone()
			}
284
285
286
287

			fn validator_discovery(ids: Vec<ValidatorId>) -> Vec<Option<AuthorityDiscoveryId>> {
				vec![None; ids.len()]
			}
288
289
290
291
292
293
294

			fn dmq_contents(
				&self,
				recipient: ParaId,
			) -> Vec<polkadot_primitives::v1::InboundDownwardMessage> {
				self.dmq_contents.get(&recipient).map(|q| q.clone()).unwrap_or_default()
			}
295
296
297
298
299
300
		}
	}

	#[test]
	fn requests_validators() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
301
		let runtime_api = Arc::new(MockRuntimeApi::default());
302
303
		let relay_parent = [1; 32].into();

304
305
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(relay_parent, Request::Validators(tx))
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), runtime_api.validators);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

	#[test]
	fn requests_validator_groups() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
324
		let runtime_api = Arc::new(MockRuntimeApi::default());
325
326
		let relay_parent = [1; 32].into();

327
328
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(relay_parent, Request::ValidatorGroups(tx))
			}).await;

			assert_eq!(rx.await.unwrap().unwrap().0, runtime_api.validator_groups);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

	#[test]
	fn requests_availability_cores() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
347
		let runtime_api = Arc::new(MockRuntimeApi::default());
348
349
		let relay_parent = [1; 32].into();

350
351
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(relay_parent, Request::AvailabilityCores(tx))
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), runtime_api.availability_cores);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

	#[test]
368
	fn requests_persisted_validation_data() {
369
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
370
		let mut runtime_api = Arc::new(MockRuntimeApi::default());
371
		let relay_parent = [1; 32].into();
372
373
374
		let para_a = 5.into();
		let para_b = 6.into();

375
		Arc::get_mut(&mut runtime_api).unwrap().validation_data.insert(para_a, Default::default());
376

377
378
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
379
380
381
382
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
383
384
385
386
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::PersistedValidationData(para_a, OccupiedCoreAssumption::Included, tx)
				),
387
388
			}).await;

389
390
391
392
393
394
395
396
397
398
399
			assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));

			let (tx, rx) = oneshot::channel();
			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::PersistedValidationData(para_b, OccupiedCoreAssumption::Included, tx)
				),
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), None);
400
401
402
403
404
405
406
407

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

	#[test]
408
	fn requests_full_validation_data() {
409
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
410
		let mut runtime_api = Arc::new(MockRuntimeApi::default());
411
412
413
414
		let relay_parent = [1; 32].into();
		let para_a = 5.into();
		let para_b = 6.into();

415
		Arc::get_mut(&mut runtime_api).unwrap().validation_data.insert(para_a, Default::default());
416

417
418
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
419
420
421
422
423
424
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
425
					Request::FullValidationData(para_a, OccupiedCoreAssumption::Included, tx)
426
427
428
429
430
431
432
433
434
				),
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));

			let (tx, rx) = oneshot::channel();
			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
435
					Request::FullValidationData(para_b, OccupiedCoreAssumption::Included, tx)
436
437
438
439
440
441
442
443
444
445
446
				),
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), None);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

447
448
449
450
451
452
453
454
455
456
457
458
	#[test]
	fn requests_check_validation_outputs() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
		let mut runtime_api = MockRuntimeApi::default();
		let relay_parent = [1; 32].into();
		let para_a = 5.into();
		let para_b = 6.into();
		let commitments = polkadot_primitives::v1::ValidationOutputs::default();

		runtime_api.validation_outputs_results.insert(para_a, false);
		runtime_api.validation_outputs_results.insert(para_b, true);

459
460
		let runtime_api = Arc::new(runtime_api);

461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::CheckValidationOutputs(
						para_a,
						commitments.clone(),
						tx,
					),
				)
			}).await;
			assert_eq!(
				rx.await.unwrap().unwrap(),
				runtime_api.validation_outputs_results[&para_a],
			);

			let (tx, rx) = oneshot::channel();
			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::CheckValidationOutputs(
						para_b,
						commitments,
						tx,
					),
				)
			}).await;
			assert_eq!(
				rx.await.unwrap().unwrap(),
				runtime_api.validation_outputs_results[&para_b],
			);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

503
504
505
	#[test]
	fn requests_session_index_for_child() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
506
		let runtime_api = Arc::new(MockRuntimeApi::default());
507
508
		let relay_parent = [1; 32].into();

509
510
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(relay_parent, Request::SessionIndexForChild(tx))
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), runtime_api.session_index_for_child);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

	#[test]
	fn requests_validation_code() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
529
		let mut runtime_api = Arc::new(MockRuntimeApi::default());
530
531
532
533
		let relay_parent = [1; 32].into();
		let para_a = 5.into();
		let para_b = 6.into();

534
		Arc::get_mut(&mut runtime_api).unwrap().validation_code.insert(para_a, Default::default());
535

536
537
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::ValidationCode(para_a, OccupiedCoreAssumption::Included, tx)
				),
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));

			let (tx, rx) = oneshot::channel();
			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::ValidationCode(para_b, OccupiedCoreAssumption::Included, tx)
				),
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), None);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

	#[test]
	fn requests_candidate_pending_availability() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
		let mut runtime_api = MockRuntimeApi::default();
		let relay_parent = [1; 32].into();
		let para_a = 5.into();
		let para_b = 6.into();

		runtime_api.candidate_pending_availability.insert(para_a, Default::default());

576
577
		let runtime_api = Arc::new(runtime_api);

578
579
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::CandidatePendingAvailability(para_a, tx),
				)
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));

			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::CandidatePendingAvailability(para_b, tx),
				)
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), None);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

	#[test]
	fn requests_candidate_events() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
612
		let runtime_api = Arc::new(MockRuntimeApi::default());
613
614
		let relay_parent = [1; 32].into();

615
616
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
617
618
619
620
621
622
623
624
625
626
627
628
629
630
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(relay_parent, Request::CandidateEvents(tx))
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), runtime_api.candidate_events);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}
631
632
633
634

	#[test]
	fn requests_dmq_contents() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
635

636
637
638
639
		let relay_parent = [1; 32].into();
		let para_a = 5.into();
		let para_b = 6.into();

640
641
642
643
644
645
646
647
648
649
650
651
652
653
		let runtime_api = Arc::new({
			let mut runtime_api = MockRuntimeApi::default();

			runtime_api.dmq_contents.insert(para_a, vec![]);
			runtime_api.dmq_contents.insert(
				para_b,
				vec![InboundDownwardMessage {
					sent_at: 228,
					msg: b"Novus Ordo Seclorum".to_vec(),
				}],
			);

			runtime_api
		});
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686

		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
		let test_task = async move {
			let (tx, rx) = oneshot::channel();
			ctx_handle
				.send(FromOverseer::Communication {
					msg: RuntimeApiMessage::Request(relay_parent, Request::DmqContents(para_a, tx)),
				})
				.await;
			assert_eq!(rx.await.unwrap().unwrap(), vec![]);

			let (tx, rx) = oneshot::channel();
			ctx_handle
				.send(FromOverseer::Communication {
					msg: RuntimeApiMessage::Request(relay_parent, Request::DmqContents(para_b, tx)),
				})
				.await;
			assert_eq!(
				rx.await.unwrap().unwrap(),
				vec![InboundDownwardMessage {
					sent_at: 228,
					msg: b"Novus Ordo Seclorum".to_vec(),
				}]
			);

			ctx_handle
				.send(FromOverseer::Signal(OverseerSignal::Conclude))
				.await;
		};
		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

687
}