lib.rs 31.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Implements the Runtime API Subsystem
//!
//! This provides a clean, ownerless wrapper around the parachain-related runtime APIs. This crate
//! can also be used to cache responses from heavy runtime APIs.
21

22
23
24
#![deny(unused_crate_dependencies)]
#![warn(missing_docs)]

25
26
27
use polkadot_subsystem::{
	Subsystem, SpawnedSubsystem, SubsystemResult, SubsystemContext,
	FromOverseer, OverseerSignal,
28
29
30
31
	messages::{
		RuntimeApiMessage, RuntimeApiRequest as Request,
	},
	errors::RuntimeApiError,
32
};
33
use polkadot_node_subsystem_util::metrics::{self, prometheus};
34
35
use polkadot_primitives::v1::{Block, BlockId, Hash, ParachainHost};

36
37
use sp_api::ProvideRuntimeApi;
use sp_core::traits::SpawnNamed;
38

39
40
use futures::{prelude::*, stream::FuturesUnordered, channel::oneshot, select};
use std::{sync::Arc, collections::VecDeque, pin::Pin};
41

42
const LOG_TARGET: &str = "runtime_api";
43

44
45
46
47
48
49
/// The number of maximum runtime api requests can be executed in parallel. Further requests will be buffered.
const MAX_PARALLEL_REQUESTS: usize = 4;

/// The name of the blocking task that executes a runtime api request.
const API_REQUEST_TASK_NAME: &str = "polkadot-runtime-api-request";

50
/// The `RuntimeApiSubsystem`. See module docs for more details.
51
pub struct RuntimeApiSubsystem<Client> {
52
	client: Arc<Client>,
53
	metrics: Metrics,
54
	spawn_handle: Box<dyn SpawnNamed>,
55
56
57
58
	/// If there are [`MAX_PARALLEL_REQUESTS`] requests being executed, we buffer them in here until they can be executed.
	waiting_requests: VecDeque<(Pin<Box<dyn Future<Output = ()> + Send>>, oneshot::Receiver<()>)>,
	/// All the active runtime api requests that are currently being executed.
	active_requests: FuturesUnordered<oneshot::Receiver<()>>,
59
}
60
61

impl<Client> RuntimeApiSubsystem<Client> {
62
	/// Create a new Runtime API subsystem wrapping the given client and metrics.
63
	pub fn new(client: Arc<Client>, metrics: Metrics, spawn_handle: impl SpawnNamed + 'static) -> Self {
64
65
66
67
68
69
70
		RuntimeApiSubsystem {
			client,
			metrics,
			spawn_handle: Box::new(spawn_handle),
			waiting_requests: Default::default(),
			active_requests: Default::default(),
		}
71
72
73
74
	}
}

impl<Client, Context> Subsystem<Context> for RuntimeApiSubsystem<Client> where
75
	Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
76
77
78
79
80
	Client::Api: ParachainHost<Block>,
	Context: SubsystemContext<Message = RuntimeApiMessage>
{
	fn start(self, ctx: Context) -> SpawnedSubsystem {
		SpawnedSubsystem {
81
			future: run(ctx, self).boxed(),
Andronik Ordian's avatar
Andronik Ordian committed
82
			name: "runtime-api-subsystem",
83
84
85
86
		}
	}
}

87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
impl<Client> RuntimeApiSubsystem<Client> where
	Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
	Client::Api: ParachainHost<Block>,
{
	/// Spawn a runtime api request.
	///
	/// If there are already [`MAX_PARALLEL_REQUESTS`] requests being executed, the request will be buffered.
	fn spawn_request(&mut self, relay_parent: Hash, request: Request) {
		let client = self.client.clone();
		let metrics = self.metrics.clone();
		let (sender, receiver) = oneshot::channel();

		let request = async move {
			make_runtime_api_request(
				client,
				metrics,
				relay_parent,
				request,
			);
			let _ = sender.send(());
		}.boxed();

		if self.active_requests.len() >= MAX_PARALLEL_REQUESTS {
			self.waiting_requests.push_back((request, receiver));

			if self.waiting_requests.len() > MAX_PARALLEL_REQUESTS * 10 {
				tracing::warn!(
					target: LOG_TARGET,
					"{} runtime api requests waiting to be executed.",
					self.waiting_requests.len(),
				)
			}
		} else {
			self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, request);
			self.active_requests.push(receiver);
		}
	}

	/// Poll the active runtime api requests.
	async fn poll_requests(&mut self) {
		// If there are no active requests, this future should be pending forever.
		if self.active_requests.len() == 0 {
			return futures::pending!()
		}

		// If there are active requests, this will always resolve to `Some(_)` when a request is finished.
		let _ = self.active_requests.next().await;

		if let Some((req, recv)) = self.waiting_requests.pop_front() {
			self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, req);
			self.active_requests.push(recv);
		}
	}
}

142
#[tracing::instrument(skip(ctx, subsystem), fields(subsystem = LOG_TARGET))]
143
144
async fn run<Client>(
	mut ctx: impl SubsystemContext<Message = RuntimeApiMessage>,
145
	mut subsystem: RuntimeApiSubsystem<Client>,
146
) -> SubsystemResult<()> where
147
	Client: ProvideRuntimeApi<Block> + Send + Sync + 'static,
148
149
150
	Client::Api: ParachainHost<Block>,
{
	loop {
151
152
153
154
		select! {
			req = ctx.recv().fuse() => match req? {
				FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
				FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {},
155
				FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {},
156
157
158
159
160
161
162
				FromOverseer::Communication { msg } => match msg {
					RuntimeApiMessage::Request(relay_parent, request) => {
						subsystem.spawn_request(relay_parent, request);
					},
				}
			},
			_ = subsystem.poll_requests().fuse() => {},
163
164
165
166
		}
	}
}

167
#[tracing::instrument(level = "trace", skip(client, metrics), fields(subsystem = LOG_TARGET))]
168
fn make_runtime_api_request<Client>(
169
170
	client: Arc<Client>,
	metrics: Metrics,
171
172
173
174
175
176
	relay_parent: Hash,
	request: Request,
) where
	Client: ProvideRuntimeApi<Block>,
	Client::Api: ParachainHost<Block>,
{
177
178
	let _timer = metrics.time_make_runtime_api_request();

179
180
181
182
183
184
	macro_rules! query {
		($api_name:ident ($($param:expr),*), $sender:expr) => {{
			let sender = $sender;
			let api = client.runtime_api();
			let res = api.$api_name(&BlockId::Hash(relay_parent), $($param),*)
				.map_err(|e| RuntimeApiError::from(format!("{:?}", e)));
185
			metrics.on_request(res.is_ok());
186
187
188
189
190
191
192
193
			let _ = sender.send(res);
		}}
	}

	match request {
		Request::Validators(sender) => query!(validators(), sender),
		Request::ValidatorGroups(sender) => query!(validator_groups(), sender),
		Request::AvailabilityCores(sender) => query!(availability_cores(), sender),
194
195
196
197
		Request::PersistedValidationData(para, assumption, sender) =>
			query!(persisted_validation_data(para, assumption), sender),
		Request::FullValidationData(para, assumption, sender) =>
			query!(full_validation_data(para, assumption), sender),
198
199
		Request::CheckValidationOutputs(para, commitments, sender) =>
			query!(check_validation_outputs(para, commitments), sender),
200
201
202
		Request::SessionIndexForChild(sender) => query!(session_index_for_child(), sender),
		Request::ValidationCode(para, assumption, sender) =>
			query!(validation_code(para, assumption), sender),
203
204
		Request::HistoricalValidationCode(para, at, sender) =>
			query!(historical_validation_code(para, at), sender),
205
206
207
		Request::CandidatePendingAvailability(para, sender) =>
			query!(candidate_pending_availability(para), sender),
		Request::CandidateEvents(sender) => query!(candidate_events(), sender),
208
		Request::SessionInfo(index, sender) => query!(session_info(index), sender),
209
		Request::DmqContents(id, sender) => query!(dmq_contents(id), sender),
Sergey Pepyakin's avatar
Sergey Pepyakin committed
210
		Request::InboundHrmpChannelsContents(id, sender) => query!(inbound_hrmp_channels_contents(id), sender),
211
212
213
	}
}

214
215
216
#[derive(Clone)]
struct MetricsInner {
	chain_api_requests: prometheus::CounterVec<prometheus::U64>,
217
	make_runtime_api_request: prometheus::Histogram,
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
}

/// Runtime API metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_request(&self, succeeded: bool) {
		if let Some(metrics) = &self.0 {
			if succeeded {
				metrics.chain_api_requests.with_label_values(&["succeeded"]).inc();
			} else {
				metrics.chain_api_requests.with_label_values(&["failed"]).inc();
			}
		}
	}
234
235
236
237
238

	/// Provide a timer for `make_runtime_api_request` which observes on drop.
	fn time_make_runtime_api_request(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.make_runtime_api_request.start_timer())
	}
239
240
241
242
243
244
245
246
247
248
249
}

impl metrics::Metrics for Metrics {
	fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
		let metrics = MetricsInner {
			chain_api_requests: prometheus::register(
				prometheus::CounterVec::new(
					prometheus::Opts::new(
						"parachain_runtime_api_requests_total",
						"Number of Runtime API requests served.",
					),
250
					&["success"],
251
252
253
				)?,
				registry,
			)?,
254
255
256
257
258
259
260
261
262
			make_runtime_api_request: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_runtime_api_make_runtime_api_request",
						"Time spent within `runtime_api::make_runtime_api_request`",
					)
				)?,
				registry,
			)?,
263
264
265
266
267
		};
		Ok(Metrics(Some(metrics)))
	}
}

268
269
270
271
272
#[cfg(test)]
mod tests {
	use super::*;

	use polkadot_primitives::v1::{
273
274
		ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, PersistedValidationData,
		Id as ParaId, OccupiedCoreAssumption, ValidationData, SessionIndex, ValidationCode,
275
276
		CommittedCandidateReceipt, CandidateEvent, InboundDownwardMessage,
		BlockNumber, InboundHrmpMessage, SessionInfo,
277
	};
278
	use polkadot_node_subsystem_test_helpers as test_helpers;
279
	use sp_core::testing::TaskExecutor;
280
	use std::{collections::{HashMap, BTreeMap}, sync::{Arc, Mutex}};
281
282
283
284
285
286
287
	use futures::channel::oneshot;

	#[derive(Default, Clone)]
	struct MockRuntimeApi {
		validators: Vec<ValidatorId>,
		validator_groups: Vec<Vec<ValidatorIndex>>,
		availability_cores: Vec<CoreState>,
288
		availability_cores_wait: Arc<Mutex<()>>,
289
		validation_data: HashMap<ParaId, ValidationData>,
290
		session_index_for_child: SessionIndex,
291
		session_info: HashMap<SessionIndex, SessionInfo>,
292
		validation_code: HashMap<ParaId, ValidationCode>,
293
		historical_validation_code: HashMap<ParaId, Vec<(BlockNumber, ValidationCode)>>,
294
		validation_outputs_results: HashMap<ParaId, bool>,
295
296
		candidate_pending_availability: HashMap<ParaId, CommittedCandidateReceipt>,
		candidate_events: Vec<CandidateEvent>,
297
		dmq_contents: HashMap<ParaId, Vec<InboundDownwardMessage>>,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
298
		hrmp_channels: HashMap<ParaId, BTreeMap<ParaId, Vec<InboundHrmpMessage>>>,
299
300
301
302
303
304
305
306
307
308
309
310
	}

	impl ProvideRuntimeApi<Block> for MockRuntimeApi {
		type Api = Self;

		fn runtime_api<'a>(&'a self) -> sp_api::ApiRef<'a, Self::Api> {
			self.clone().into()
		}
	}

	sp_api::mock_impl_runtime_apis! {
		impl ParachainHost<Block> for MockRuntimeApi {
311
			type Error = sp_api::ApiError;
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328

			fn validators(&self) -> Vec<ValidatorId> {
				self.validators.clone()
			}

			fn validator_groups(&self) -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo) {
				(
					self.validator_groups.clone(),
					GroupRotationInfo {
						session_start_block: 1,
						group_rotation_frequency: 100,
						now: 10,
					},
				)
			}

			fn availability_cores(&self) -> Vec<CoreState> {
329
				let _ = self.availability_cores_wait.lock().unwrap();
330
331
332
				self.availability_cores.clone()
			}

333
334
335
336
337
338
			fn persisted_validation_data(
				&self,
				para: ParaId,
				_assumption: OccupiedCoreAssumption,
			) -> Option<PersistedValidationData> {
				self.validation_data.get(&para).map(|l| l.persisted.clone())
339
340
			}

341
			fn full_validation_data(
342
343
344
				&self,
				para: ParaId,
				_assumption: OccupiedCoreAssumption,
345
346
			) -> Option<ValidationData> {
				self.validation_data.get(&para).map(|l| l.clone())
347
348
			}

349
350
351
			fn check_validation_outputs(
				&self,
				para_id: ParaId,
352
				_commitments: polkadot_primitives::v1::CandidateCommitments,
353
354
355
356
357
358
359
360
361
			) -> bool {
				self.validation_outputs_results
					.get(&para_id)
					.cloned()
					.expect(
						"`check_validation_outputs` called but the expected result hasn't been supplied"
					)
			}

362
363
364
365
			fn session_index_for_child(&self) -> SessionIndex {
				self.session_index_for_child.clone()
			}

366
367
368
369
			fn session_info(&self, index: SessionIndex) -> Option<SessionInfo> {
				self.session_info.get(&index).cloned()
			}

370
371
372
373
374
375
376
377
			fn validation_code(
				&self,
				para: ParaId,
				_assumption: OccupiedCoreAssumption,
			) -> Option<ValidationCode> {
				self.validation_code.get(&para).map(|c| c.clone())
			}

378
379
380
381
382
383
384
385
386
387
388
389
390
			fn historical_validation_code(
				&self,
				para: ParaId,
				at: BlockNumber,
			) -> Option<ValidationCode> {
				self.historical_validation_code.get(&para).and_then(|h_code| {
					h_code.iter()
						.take_while(|(changed_at, _)| changed_at <= &at)
						.last()
						.map(|(_, code)| code.clone())
				})
			}

391
392
393
394
395
396
397
398
399
400
			fn candidate_pending_availability(
				&self,
				para: ParaId,
			) -> Option<CommittedCandidateReceipt> {
				self.candidate_pending_availability.get(&para).map(|c| c.clone())
			}

			fn candidate_events(&self) -> Vec<CandidateEvent> {
				self.candidate_events.clone()
			}
401

402
403
404
			fn dmq_contents(
				&self,
				recipient: ParaId,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
405
			) -> Vec<InboundDownwardMessage> {
406
407
				self.dmq_contents.get(&recipient).map(|q| q.clone()).unwrap_or_default()
			}
Sergey Pepyakin's avatar
Sergey Pepyakin committed
408
409
410
411
412
413
414

			fn inbound_hrmp_channels_contents(
				&self,
				recipient: ParaId
			) -> BTreeMap<ParaId, Vec<InboundHrmpMessage>> {
				self.hrmp_channels.get(&recipient).map(|q| q.clone()).unwrap_or_default()
			}
415
416
417
418
419
420
		}
	}

	#[test]
	fn requests_validators() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
421
		let runtime_api = Arc::new(MockRuntimeApi::default());
422
		let relay_parent = [1; 32].into();
423
		let spawner = sp_core::testing::TaskExecutor::new();
424

425
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
426
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(relay_parent, Request::Validators(tx))
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), runtime_api.validators);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

	#[test]
	fn requests_validator_groups() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
445
		let runtime_api = Arc::new(MockRuntimeApi::default());
446
		let relay_parent = [1; 32].into();
447
		let spawner = sp_core::testing::TaskExecutor::new();
448

449
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
450
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(relay_parent, Request::ValidatorGroups(tx))
			}).await;

			assert_eq!(rx.await.unwrap().unwrap().0, runtime_api.validator_groups);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

	#[test]
	fn requests_availability_cores() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
469
		let runtime_api = Arc::new(MockRuntimeApi::default());
470
		let relay_parent = [1; 32].into();
471
		let spawner = sp_core::testing::TaskExecutor::new();
472

473
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
474
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(relay_parent, Request::AvailabilityCores(tx))
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), runtime_api.availability_cores);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

	#[test]
491
	fn requests_persisted_validation_data() {
492
493
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
		let relay_parent = [1; 32].into();
494
495
		let para_a = 5.into();
		let para_b = 6.into();
496
		let spawner = sp_core::testing::TaskExecutor::new();
497

498
499
500
		let mut runtime_api = MockRuntimeApi::default();
		runtime_api.validation_data.insert(para_a, Default::default());
		let runtime_api = Arc::new(runtime_api);
501

502
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
503
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
504
505
506
507
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
508
509
510
511
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::PersistedValidationData(para_a, OccupiedCoreAssumption::Included, tx)
				),
512
513
			}).await;

514
515
516
517
518
519
520
521
522
523
524
			assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));

			let (tx, rx) = oneshot::channel();
			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::PersistedValidationData(para_b, OccupiedCoreAssumption::Included, tx)
				),
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), None);
525
526
527
528
529
530
531
532

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

	#[test]
533
	fn requests_full_validation_data() {
534
535
536
537
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
		let relay_parent = [1; 32].into();
		let para_a = 5.into();
		let para_b = 6.into();
538
		let spawner = sp_core::testing::TaskExecutor::new();
539

540
541
542
		let mut runtime_api = MockRuntimeApi::default();
		runtime_api.validation_data.insert(para_a, Default::default());
		let runtime_api = Arc::new(runtime_api);
543

544
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
545
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
546
547
548
549
550
551
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
552
					Request::FullValidationData(para_a, OccupiedCoreAssumption::Included, tx)
553
554
555
556
557
558
559
560
561
				),
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));

			let (tx, rx) = oneshot::channel();
			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
562
					Request::FullValidationData(para_b, OccupiedCoreAssumption::Included, tx)
563
564
565
566
567
568
569
570
571
572
573
				),
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), None);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

574
575
576
577
578
579
580
	#[test]
	fn requests_check_validation_outputs() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
		let mut runtime_api = MockRuntimeApi::default();
		let relay_parent = [1; 32].into();
		let para_a = 5.into();
		let para_b = 6.into();
581
		let commitments = polkadot_primitives::v1::CandidateCommitments::default();
582
		let spawner = sp_core::testing::TaskExecutor::new();
583
584
585
586

		runtime_api.validation_outputs_results.insert(para_a, false);
		runtime_api.validation_outputs_results.insert(para_b, true);

587
588
		let runtime_api = Arc::new(runtime_api);

589
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::CheckValidationOutputs(
						para_a,
						commitments.clone(),
						tx,
					),
				)
			}).await;
			assert_eq!(
				rx.await.unwrap().unwrap(),
				runtime_api.validation_outputs_results[&para_a],
			);

			let (tx, rx) = oneshot::channel();
			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::CheckValidationOutputs(
						para_b,
						commitments,
						tx,
					),
				)
			}).await;
			assert_eq!(
				rx.await.unwrap().unwrap(),
				runtime_api.validation_outputs_results[&para_b],
			);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

631
632
633
	#[test]
	fn requests_session_index_for_child() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
634
		let runtime_api = Arc::new(MockRuntimeApi::default());
635
		let relay_parent = [1; 32].into();
636
		let spawner = sp_core::testing::TaskExecutor::new();
637

638
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
639
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(relay_parent, Request::SessionIndexForChild(tx))
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), runtime_api.session_index_for_child);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

655
656
657
658
659
660
661
	#[test]
	fn requests_session_info() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
		let mut runtime_api = MockRuntimeApi::default();
		let session_index = 1;
		runtime_api.session_info.insert(session_index, Default::default());
		let runtime_api = Arc::new(runtime_api);
662
		let spawner = sp_core::testing::TaskExecutor::new();
663
664
665

		let relay_parent = [1; 32].into();

666
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(relay_parent, Request::SessionInfo(session_index, tx))
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

683
684
685
	#[test]
	fn requests_validation_code() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
686

687
688
689
		let relay_parent = [1; 32].into();
		let para_a = 5.into();
		let para_b = 6.into();
690
		let spawner = sp_core::testing::TaskExecutor::new();
691

692
693
694
		let mut runtime_api = MockRuntimeApi::default();
		runtime_api.validation_code.insert(para_a, Default::default());
		let runtime_api = Arc::new(runtime_api);
695

696
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
697
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::ValidationCode(para_a, OccupiedCoreAssumption::Included, tx)
				),
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));

			let (tx, rx) = oneshot::channel();
			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::ValidationCode(para_b, OccupiedCoreAssumption::Included, tx)
				),
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), None);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

	#[test]
	fn requests_candidate_pending_availability() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
		let relay_parent = [1; 32].into();
		let para_a = 5.into();
		let para_b = 6.into();
732
		let spawner = sp_core::testing::TaskExecutor::new();
733

734
		let mut runtime_api = MockRuntimeApi::default();
735
		runtime_api.candidate_pending_availability.insert(para_a, Default::default());
736
737
		let runtime_api = Arc::new(runtime_api);

738
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
739
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::CandidatePendingAvailability(para_a, tx),
				)
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));

			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::CandidatePendingAvailability(para_b, tx),
				)
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), None);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

	#[test]
	fn requests_candidate_events() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
772
		let runtime_api = Arc::new(MockRuntimeApi::default());
773
		let relay_parent = [1; 32].into();
774
		let spawner = sp_core::testing::TaskExecutor::new();
775

776
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
777
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
778
779
780
781
782
783
784
785
786
787
788
789
790
791
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(relay_parent, Request::CandidateEvents(tx))
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), runtime_api.candidate_events);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}
792
793
794
795

	#[test]
	fn requests_dmq_contents() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
796

797
798
799
		let relay_parent = [1; 32].into();
		let para_a = 5.into();
		let para_b = 6.into();
800
		let spawner = sp_core::testing::TaskExecutor::new();
801

802
803
804
805
806
807
808
809
810
811
812
813
814
815
		let runtime_api = Arc::new({
			let mut runtime_api = MockRuntimeApi::default();

			runtime_api.dmq_contents.insert(para_a, vec![]);
			runtime_api.dmq_contents.insert(
				para_b,
				vec![InboundDownwardMessage {
					sent_at: 228,
					msg: b"Novus Ordo Seclorum".to_vec(),
				}],
			);

			runtime_api
		});
816

817
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
		let test_task = async move {
			let (tx, rx) = oneshot::channel();
			ctx_handle
				.send(FromOverseer::Communication {
					msg: RuntimeApiMessage::Request(relay_parent, Request::DmqContents(para_a, tx)),
				})
				.await;
			assert_eq!(rx.await.unwrap().unwrap(), vec![]);

			let (tx, rx) = oneshot::channel();
			ctx_handle
				.send(FromOverseer::Communication {
					msg: RuntimeApiMessage::Request(relay_parent, Request::DmqContents(para_b, tx)),
				})
				.await;
			assert_eq!(
				rx.await.unwrap().unwrap(),
				vec![InboundDownwardMessage {
					sent_at: 228,
					msg: b"Novus Ordo Seclorum".to_vec(),
				}]
			);

			ctx_handle
				.send(FromOverseer::Signal(OverseerSignal::Conclude))
				.await;
		};
		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

Sergey Pepyakin's avatar
Sergey Pepyakin committed
849
850
851
852
853
854
855
856
	#[test]
	fn requests_inbound_hrmp_channels_contents() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());

		let relay_parent = [1; 32].into();
		let para_a = 99.into();
		let para_b = 66.into();
		let para_c = 33.into();
857
		let spawner = sp_core::testing::TaskExecutor::new();
Sergey Pepyakin's avatar
Sergey Pepyakin committed
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883

		let para_b_inbound_channels = [
			(para_a, vec![]),
			(
				para_c,
				vec![InboundHrmpMessage {
					sent_at: 1,
					data: "𝙀=𝙈𝘾²".as_bytes().to_owned(),
				}],
			),
		]
		.iter()
		.cloned()
		.collect::<BTreeMap<_, _>>();

		let runtime_api = Arc::new({
			let mut runtime_api = MockRuntimeApi::default();

			runtime_api.hrmp_channels.insert(para_a, BTreeMap::new());
			runtime_api
				.hrmp_channels
				.insert(para_b, para_b_inbound_channels.clone());

			runtime_api
		});

884
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
Sergey Pepyakin's avatar
Sergey Pepyakin committed
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
		let test_task = async move {
			let (tx, rx) = oneshot::channel();
			ctx_handle
				.send(FromOverseer::Communication {
					msg: RuntimeApiMessage::Request(
						relay_parent,
						Request::InboundHrmpChannelsContents(para_a, tx),
					),
				})
				.await;
			assert_eq!(rx.await.unwrap().unwrap(), BTreeMap::new());

			let (tx, rx) = oneshot::channel();
			ctx_handle
				.send(FromOverseer::Communication {
					msg: RuntimeApiMessage::Request(
						relay_parent,
						Request::InboundHrmpChannelsContents(para_b, tx),
					),
				})
				.await;
			assert_eq!(rx.await.unwrap().unwrap(), para_b_inbound_channels,);

			ctx_handle
				.send(FromOverseer::Signal(OverseerSignal::Conclude))
				.await;
		};
		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

916
917
918
919
920
921
	#[test]
	fn requests_historical_code() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());

		let para_a = 5.into();
		let para_b = 6.into();
922
		let spawner = sp_core::testing::TaskExecutor::new();
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940

		let runtime_api = Arc::new({
			let mut runtime_api = MockRuntimeApi::default();

			runtime_api.historical_validation_code.insert(
				para_a,
				vec![(1, vec![1, 2, 3].into()), (10, vec![4, 5, 6].into())],
			);

			runtime_api.historical_validation_code.insert(
				para_b,
				vec![(5, vec![7, 8, 9].into())],
			);

			runtime_api
		});
		let relay_parent = [1; 32].into();

941
		let subsystem = RuntimeApiSubsystem::new(runtime_api, Metrics(None), spawner);
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
		let test_task = async move {
			{
				let (tx, rx) = oneshot::channel();
				ctx_handle.send(FromOverseer::Communication {
					msg: RuntimeApiMessage::Request(
						relay_parent,
						Request::HistoricalValidationCode(para_a, 5, tx),
					)
				}).await;

				assert_eq!(rx.await.unwrap().unwrap(), Some(ValidationCode::from(vec![1, 2, 3])));
			}

			{
				let (tx, rx) = oneshot::channel();
				ctx_handle.send(FromOverseer::Communication {
					msg: RuntimeApiMessage::Request(
						relay_parent,
						Request::HistoricalValidationCode(para_a, 10, tx),
					)
				}).await;

				assert_eq!(rx.await.unwrap().unwrap(), Some(ValidationCode::from(vec![4, 5, 6])));
			}

			{
				let (tx, rx) = oneshot::channel();
				ctx_handle.send(FromOverseer::Communication {
					msg: RuntimeApiMessage::Request(
						relay_parent,
						Request::HistoricalValidationCode(para_b, 1, tx),
					)
				}).await;

				assert!(rx.await.unwrap().unwrap().is_none());
			}

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000

	#[test]
	fn multiple_requests_in_parallel_are_working() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
		let runtime_api = Arc::new(MockRuntimeApi::default());
		let relay_parent = [1; 32].into();
		let spawner = sp_core::testing::TaskExecutor::new();
		let mutex = runtime_api.availability_cores_wait.clone();

		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
		let test_task = async move {
			// Make all requests block until we release this mutex.
			let lock = mutex.lock().unwrap();

			let mut receivers = Vec::new();
For faster browsing, not all history is shown. View entire blame