lib.rs 38.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Implements the Runtime API Subsystem
//!
//! This provides a clean, ownerless wrapper around the parachain-related runtime APIs. This crate
//! can also be used to cache responses from heavy runtime APIs.
21

22
23
24
#![deny(unused_crate_dependencies)]
#![warn(missing_docs)]

25
26
27
use polkadot_subsystem::{
	Subsystem, SpawnedSubsystem, SubsystemResult, SubsystemContext,
	FromOverseer, OverseerSignal,
28
29
30
31
	messages::{
		RuntimeApiMessage, RuntimeApiRequest as Request,
	},
	errors::RuntimeApiError,
32
};
33
use polkadot_node_subsystem_util::metrics::{self, prometheus};
34
35
use polkadot_primitives::v1::{Block, BlockId, Hash, ParachainHost};

36
37
use sp_api::ProvideRuntimeApi;
use sp_core::traits::SpawnNamed;
38
use sp_consensus_babe::BabeApi;
39

40
41
use futures::{prelude::*, stream::FuturesUnordered, channel::oneshot, select};
use std::{sync::Arc, collections::VecDeque, pin::Pin};
42
43
44
use cache::{RequestResult, RequestResultCache};

mod cache;
45

46
const LOG_TARGET: &str = "parachain::runtime-api";
47

48
49
50
51
52
53
/// The number of maximum runtime api requests can be executed in parallel. Further requests will be buffered.
const MAX_PARALLEL_REQUESTS: usize = 4;

/// The name of the blocking task that executes a runtime api request.
const API_REQUEST_TASK_NAME: &str = "polkadot-runtime-api-request";

54
/// The `RuntimeApiSubsystem`. See module docs for more details.
55
pub struct RuntimeApiSubsystem<Client> {
56
	client: Arc<Client>,
57
	metrics: Metrics,
58
	spawn_handle: Box<dyn SpawnNamed>,
59
	/// If there are [`MAX_PARALLEL_REQUESTS`] requests being executed, we buffer them in here until they can be executed.
60
61
62
63
	waiting_requests: VecDeque<(
		Pin<Box<dyn Future<Output = ()> + Send>>,
		oneshot::Receiver<Option<RequestResult>>,
	)>,
64
	/// All the active runtime api requests that are currently being executed.
65
66
67
	active_requests: FuturesUnordered<oneshot::Receiver<Option<RequestResult>>>,
	/// Requests results cache
	requests_cache: RequestResultCache,
68
}
69
70

impl<Client> RuntimeApiSubsystem<Client> {
71
	/// Create a new Runtime API subsystem wrapping the given client and metrics.
72
	pub fn new(client: Arc<Client>, metrics: Metrics, spawn_handle: impl SpawnNamed + 'static) -> Self {
73
74
75
76
77
78
		RuntimeApiSubsystem {
			client,
			metrics,
			spawn_handle: Box::new(spawn_handle),
			waiting_requests: Default::default(),
			active_requests: Default::default(),
79
			requests_cache: RequestResultCache::default(),
80
		}
81
82
83
84
	}
}

impl<Client, Context> Subsystem<Context> for RuntimeApiSubsystem<Client> where
85
	Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
86
	Client::Api: ParachainHost<Block> + BabeApi<Block>,
87
88
89
90
	Context: SubsystemContext<Message = RuntimeApiMessage>
{
	fn start(self, ctx: Context) -> SpawnedSubsystem {
		SpawnedSubsystem {
91
			future: run(ctx, self).boxed(),
Andronik Ordian's avatar
Andronik Ordian committed
92
			name: "runtime-api-subsystem",
93
94
95
96
		}
	}
}

97
98
impl<Client> RuntimeApiSubsystem<Client> where
	Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
99
	Client::Api: ParachainHost<Block> + BabeApi<Block>,
100
{
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
	fn store_cache(&mut self, result: RequestResult) {
		use RequestResult::*;

		match result {
			Validators(relay_parent, validators) =>
				self.requests_cache.cache_validators(relay_parent, validators),
			ValidatorGroups(relay_parent, groups) =>
				self.requests_cache.cache_validator_groups(relay_parent, groups),
			AvailabilityCores(relay_parent, cores) =>
				self.requests_cache.cache_availability_cores(relay_parent, cores),
			PersistedValidationData(relay_parent, para_id, assumption, data) =>
				self.requests_cache.cache_persisted_validation_data((relay_parent, para_id, assumption), data),
			CheckValidationOutputs(relay_parent, para_id, commitments, b) =>
				self.requests_cache.cache_check_validation_outputs((relay_parent, para_id, commitments), b),
			SessionIndexForChild(relay_parent, session_index) =>
				self.requests_cache.cache_session_index_for_child(relay_parent, session_index),
			ValidationCode(relay_parent, para_id, assumption, code) =>
				self.requests_cache.cache_validation_code((relay_parent, para_id, assumption), code),
			HistoricalValidationCode(relay_parent, para_id, n, code) =>
				self.requests_cache.cache_historical_validation_code((relay_parent, para_id, n), code),
			CandidatePendingAvailability(relay_parent, para_id, candidate) =>
				self.requests_cache.cache_candidate_pending_availability((relay_parent, para_id), candidate),
			CandidateEvents(relay_parent, events) =>
				self.requests_cache.cache_candidate_events(relay_parent, events),
			SessionInfo(relay_parent, session_index, info) =>
				self.requests_cache.cache_session_info((relay_parent, session_index), info),
			DmqContents(relay_parent, para_id, messages) =>
				self.requests_cache.cache_dmq_contents((relay_parent, para_id), messages),
			InboundHrmpChannelsContents(relay_parent, para_id, contents) =>
				self.requests_cache.cache_inbound_hrmp_channel_contents((relay_parent, para_id), contents),
131
132
			CurrentBabeEpoch(relay_parent, epoch) =>
				self.requests_cache.cache_current_babe_epoch(relay_parent, epoch),
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
		}
	}

	fn query_cache(&mut self, relay_parent: Hash, request: Request) -> Option<Request> {
		macro_rules! query {
			// Just query by relay parent
			($cache_api_name:ident (), $sender:expr) => {{
				let sender = $sender;
				if let Some(value) = self.requests_cache.$cache_api_name(&relay_parent) {
					let _ = sender.send(Ok(value.clone()));
					self.metrics.on_cached_request();
					None
				} else {
					Some(sender)
				}
			}};
			// Query by relay parent + additional parameters
			($cache_api_name:ident ($($param:expr),+), $sender:expr) => {{
				let sender = $sender;
				if let Some(value) = self.requests_cache.$cache_api_name((relay_parent.clone(), $($param.clone()),+)) {
					self.metrics.on_cached_request();
					let _ = sender.send(Ok(value.clone()));
					None
				} else {
					Some(sender)
				}
			}}
		}

		match request {
			Request::Validators(sender) => query!(validators(), sender)
				.map(|sender| Request::Validators(sender)),
			Request::ValidatorGroups(sender) => query!(validator_groups(), sender)
				.map(|sender| Request::ValidatorGroups(sender)),
			Request::AvailabilityCores(sender) => query!(availability_cores(), sender)
				.map(|sender| Request::AvailabilityCores(sender)),
			Request::PersistedValidationData(para, assumption, sender) =>
				query!(persisted_validation_data(para, assumption), sender)
					.map(|sender| Request::PersistedValidationData(para, assumption, sender)),
			Request::CheckValidationOutputs(para, commitments, sender) =>
				query!(check_validation_outputs(para, commitments), sender)
					.map(|sender| Request::CheckValidationOutputs(para, commitments, sender)),
			Request::SessionIndexForChild(sender) =>
				query!(session_index_for_child(), sender)
					.map(|sender| Request::SessionIndexForChild(sender)),
			Request::ValidationCode(para, assumption, sender) =>
				query!(validation_code(para, assumption), sender)
					.map(|sender| Request::ValidationCode(para, assumption, sender)),
			Request::HistoricalValidationCode(para, at, sender) =>
				query!(historical_validation_code(para, at), sender)
					.map(|sender| Request::HistoricalValidationCode(para, at, sender)),
			Request::CandidatePendingAvailability(para, sender) =>
				query!(candidate_pending_availability(para), sender)
					.map(|sender| Request::CandidatePendingAvailability(para, sender)),
			Request::CandidateEvents(sender) => query!(candidate_events(), sender)
				.map(|sender| Request::CandidateEvents(sender)),
			Request::SessionInfo(index, sender) => query!(session_info(index), sender)
				.map(|sender| Request::SessionInfo(index, sender)),
			Request::DmqContents(id, sender) => query!(dmq_contents(id), sender)
				.map(|sender| Request::DmqContents(id, sender)),
			Request::InboundHrmpChannelsContents(id, sender) =>
				query!(inbound_hrmp_channels_contents(id), sender)
195
196
197
198
					.map(|sender| Request::InboundHrmpChannelsContents(id, sender)),
			Request::CurrentBabeEpoch(sender) =>
				query!(current_babe_epoch(), sender)
					.map(|sender| Request::CurrentBabeEpoch(sender)),
199
200
201
		}
	}

202
203
204
205
206
207
208
209
	/// Spawn a runtime api request.
	///
	/// If there are already [`MAX_PARALLEL_REQUESTS`] requests being executed, the request will be buffered.
	fn spawn_request(&mut self, relay_parent: Hash, request: Request) {
		let client = self.client.clone();
		let metrics = self.metrics.clone();
		let (sender, receiver) = oneshot::channel();

210
211
212
213
214
		let request = match self.query_cache(relay_parent.clone(), request) {
			Some(request) => request,
			None => return,
		};

215
		let request = async move {
216
			let result = make_runtime_api_request(
217
218
219
220
221
				client,
				metrics,
				relay_parent,
				request,
			);
222
			let _ = sender.send(result);
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
		}.boxed();

		if self.active_requests.len() >= MAX_PARALLEL_REQUESTS {
			self.waiting_requests.push_back((request, receiver));

			if self.waiting_requests.len() > MAX_PARALLEL_REQUESTS * 10 {
				tracing::warn!(
					target: LOG_TARGET,
					"{} runtime api requests waiting to be executed.",
					self.waiting_requests.len(),
				)
			}
		} else {
			self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, request);
			self.active_requests.push(receiver);
		}
	}

	/// Poll the active runtime api requests.
	async fn poll_requests(&mut self) {
		// If there are no active requests, this future should be pending forever.
		if self.active_requests.len() == 0 {
			return futures::pending!()
		}

		// If there are active requests, this will always resolve to `Some(_)` when a request is finished.
249
250
251
		if let Some(Ok(Some(result))) = self.active_requests.next().await {
			self.store_cache(result);
		}
252
253
254
255
256
257
258
259

		if let Some((req, recv)) = self.waiting_requests.pop_front() {
			self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, req);
			self.active_requests.push(recv);
		}
	}
}

260
#[tracing::instrument(skip(ctx, subsystem), fields(subsystem = LOG_TARGET))]
261
262
async fn run<Client>(
	mut ctx: impl SubsystemContext<Message = RuntimeApiMessage>,
263
	mut subsystem: RuntimeApiSubsystem<Client>,
264
) -> SubsystemResult<()> where
265
	Client: ProvideRuntimeApi<Block> + Send + Sync + 'static,
266
	Client::Api: ParachainHost<Block> + BabeApi<Block>,
267
268
{
	loop {
269
270
271
272
		select! {
			req = ctx.recv().fuse() => match req? {
				FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
				FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {},
273
				FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {},
274
275
276
277
278
279
280
				FromOverseer::Communication { msg } => match msg {
					RuntimeApiMessage::Request(relay_parent, request) => {
						subsystem.spawn_request(relay_parent, request);
					},
				}
			},
			_ = subsystem.poll_requests().fuse() => {},
281
282
283
284
		}
	}
}

285
#[tracing::instrument(level = "trace", skip(client, metrics), fields(subsystem = LOG_TARGET))]
286
fn make_runtime_api_request<Client>(
287
288
	client: Arc<Client>,
	metrics: Metrics,
289
290
	relay_parent: Hash,
	request: Request,
291
292
) -> Option<RequestResult>
where
293
	Client: ProvideRuntimeApi<Block>,
294
	Client::Api: ParachainHost<Block> + BabeApi<Block>,
295
{
296
297
	let _timer = metrics.time_make_runtime_api_request();

298
	macro_rules! query {
299
		($req_variant:ident, $api_name:ident (), $sender:expr) => {{
300
301
			let sender = $sender;
			let api = client.runtime_api();
302
			let res = api.$api_name(&BlockId::Hash(relay_parent))
303
				.map_err(|e| RuntimeApiError::from(format!("{:?}", e)));
304
			metrics.on_request(res.is_ok());
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
			let _ = sender.send(res.clone());

			if let Ok(res) = res {
				Some(RequestResult::$req_variant(relay_parent, res.clone()))
			} else {
				None
			}
		}};
		($req_variant:ident, $api_name:ident ($($param:expr),+), $sender:expr) => {{
			let sender = $sender;
			let api = client.runtime_api();
			let res = api.$api_name(&BlockId::Hash(relay_parent), $($param.clone()),*)
				.map_err(|e| RuntimeApiError::from(format!("{:?}", e)));
			metrics.on_request(res.is_ok());
			let _ = sender.send(res.clone());

			if let Ok(res) = res {
				Some(RequestResult::$req_variant(relay_parent, $($param),+, res.clone()))
			} else {
				None
			}
326
327
328
329
		}}
	}

	match request {
330
331
332
		Request::Validators(sender) => query!(Validators, validators(), sender),
		Request::ValidatorGroups(sender) => query!(ValidatorGroups, validator_groups(), sender),
		Request::AvailabilityCores(sender) => query!(AvailabilityCores, availability_cores(), sender),
333
		Request::PersistedValidationData(para, assumption, sender) =>
334
			query!(PersistedValidationData, persisted_validation_data(para, assumption), sender),
335
		Request::CheckValidationOutputs(para, commitments, sender) =>
336
337
			query!(CheckValidationOutputs, check_validation_outputs(para, commitments), sender),
		Request::SessionIndexForChild(sender) => query!(SessionIndexForChild, session_index_for_child(), sender),
338
		Request::ValidationCode(para, assumption, sender) =>
339
			query!(ValidationCode, validation_code(para, assumption), sender),
340
		Request::HistoricalValidationCode(para, at, sender) =>
341
			query!(HistoricalValidationCode, historical_validation_code(para, at), sender),
342
		Request::CandidatePendingAvailability(para, sender) =>
343
344
345
346
347
			query!(CandidatePendingAvailability, candidate_pending_availability(para), sender),
		Request::CandidateEvents(sender) => query!(CandidateEvents, candidate_events(), sender),
		Request::SessionInfo(index, sender) => query!(SessionInfo, session_info(index), sender),
		Request::DmqContents(id, sender) => query!(DmqContents, dmq_contents(id), sender),
		Request::InboundHrmpChannelsContents(id, sender) => query!(InboundHrmpChannelsContents, inbound_hrmp_channels_contents(id), sender),
348
		Request::CurrentBabeEpoch(sender) => query!(CurrentBabeEpoch, current_epoch(), sender),
349
350
351
	}
}

352
353
354
#[derive(Clone)]
struct MetricsInner {
	chain_api_requests: prometheus::CounterVec<prometheus::U64>,
355
	make_runtime_api_request: prometheus::Histogram,
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
}

/// Runtime API metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_request(&self, succeeded: bool) {
		if let Some(metrics) = &self.0 {
			if succeeded {
				metrics.chain_api_requests.with_label_values(&["succeeded"]).inc();
			} else {
				metrics.chain_api_requests.with_label_values(&["failed"]).inc();
			}
		}
	}
372

373
374
375
376
377
	fn on_cached_request(&self) {
		self.0.as_ref()
			.map(|metrics| metrics.chain_api_requests.with_label_values(&["cached"]).inc());
	}

378
379
380
381
	/// Provide a timer for `make_runtime_api_request` which observes on drop.
	fn time_make_runtime_api_request(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.make_runtime_api_request.start_timer())
	}
382
383
384
385
386
387
388
389
390
391
392
}

impl metrics::Metrics for Metrics {
	fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
		let metrics = MetricsInner {
			chain_api_requests: prometheus::register(
				prometheus::CounterVec::new(
					prometheus::Opts::new(
						"parachain_runtime_api_requests_total",
						"Number of Runtime API requests served.",
					),
393
					&["success"],
394
395
396
				)?,
				registry,
			)?,
397
398
399
400
401
402
403
404
405
			make_runtime_api_request: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_runtime_api_make_runtime_api_request",
						"Time spent within `runtime_api::make_runtime_api_request`",
					)
				)?,
				registry,
			)?,
406
407
408
409
410
		};
		Ok(Metrics(Some(metrics)))
	}
}

411
412
413
414
415
#[cfg(test)]
mod tests {
	use super::*;

	use polkadot_primitives::v1::{
416
		ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, PersistedValidationData,
417
		Id as ParaId, OccupiedCoreAssumption, SessionIndex, ValidationCode,
418
419
		CommittedCandidateReceipt, CandidateEvent, InboundDownwardMessage,
		BlockNumber, InboundHrmpMessage, SessionInfo,
420
	};
421
	use polkadot_node_subsystem_test_helpers as test_helpers;
422
	use sp_core::testing::TaskExecutor;
423
	use std::{collections::{HashMap, BTreeMap}, sync::{Arc, Mutex}};
424
	use futures::channel::oneshot;
425
426
427
	use polkadot_node_primitives::{
		BabeEpoch, BabeEpochConfiguration, BabeAllowedSlots,
	};
428
429
430
431
432
433

	#[derive(Default, Clone)]
	struct MockRuntimeApi {
		validators: Vec<ValidatorId>,
		validator_groups: Vec<Vec<ValidatorIndex>>,
		availability_cores: Vec<CoreState>,
434
		availability_cores_wait: Arc<Mutex<()>>,
435
		validation_data: HashMap<ParaId, PersistedValidationData>,
436
		session_index_for_child: SessionIndex,
437
		session_info: HashMap<SessionIndex, SessionInfo>,
438
		validation_code: HashMap<ParaId, ValidationCode>,
439
		historical_validation_code: HashMap<ParaId, Vec<(BlockNumber, ValidationCode)>>,
440
		validation_outputs_results: HashMap<ParaId, bool>,
441
442
		candidate_pending_availability: HashMap<ParaId, CommittedCandidateReceipt>,
		candidate_events: Vec<CandidateEvent>,
443
		dmq_contents: HashMap<ParaId, Vec<InboundDownwardMessage>>,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
444
		hrmp_channels: HashMap<ParaId, BTreeMap<ParaId, Vec<InboundHrmpMessage>>>,
445
		babe_epoch: Option<BabeEpoch>,
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
	}

	impl ProvideRuntimeApi<Block> for MockRuntimeApi {
		type Api = Self;

		fn runtime_api<'a>(&'a self) -> sp_api::ApiRef<'a, Self::Api> {
			self.clone().into()
		}
	}

	sp_api::mock_impl_runtime_apis! {
		impl ParachainHost<Block> for MockRuntimeApi {
			fn validators(&self) -> Vec<ValidatorId> {
				self.validators.clone()
			}

			fn validator_groups(&self) -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo) {
				(
					self.validator_groups.clone(),
					GroupRotationInfo {
						session_start_block: 1,
						group_rotation_frequency: 100,
						now: 10,
					},
				)
			}

			fn availability_cores(&self) -> Vec<CoreState> {
474
				let _ = self.availability_cores_wait.lock().unwrap();
475
476
477
				self.availability_cores.clone()
			}

478
479
480
481
482
			fn persisted_validation_data(
				&self,
				para: ParaId,
				_assumption: OccupiedCoreAssumption,
			) -> Option<PersistedValidationData> {
483
				self.validation_data.get(&para).cloned()
484
485
			}

486
487
488
			fn check_validation_outputs(
				&self,
				para_id: ParaId,
489
				_commitments: polkadot_primitives::v1::CandidateCommitments,
490
491
492
493
494
495
496
497
498
			) -> bool {
				self.validation_outputs_results
					.get(&para_id)
					.cloned()
					.expect(
						"`check_validation_outputs` called but the expected result hasn't been supplied"
					)
			}

499
500
501
502
			fn session_index_for_child(&self) -> SessionIndex {
				self.session_index_for_child.clone()
			}

503
504
505
506
			fn session_info(&self, index: SessionIndex) -> Option<SessionInfo> {
				self.session_info.get(&index).cloned()
			}

507
508
509
510
511
512
513
514
			fn validation_code(
				&self,
				para: ParaId,
				_assumption: OccupiedCoreAssumption,
			) -> Option<ValidationCode> {
				self.validation_code.get(&para).map(|c| c.clone())
			}

515
516
517
518
519
520
521
522
523
524
525
526
527
			fn historical_validation_code(
				&self,
				para: ParaId,
				at: BlockNumber,
			) -> Option<ValidationCode> {
				self.historical_validation_code.get(&para).and_then(|h_code| {
					h_code.iter()
						.take_while(|(changed_at, _)| changed_at <= &at)
						.last()
						.map(|(_, code)| code.clone())
				})
			}

528
529
530
531
532
533
534
535
536
537
			fn candidate_pending_availability(
				&self,
				para: ParaId,
			) -> Option<CommittedCandidateReceipt> {
				self.candidate_pending_availability.get(&para).map(|c| c.clone())
			}

			fn candidate_events(&self) -> Vec<CandidateEvent> {
				self.candidate_events.clone()
			}
538

539
540
541
			fn dmq_contents(
				&self,
				recipient: ParaId,
Sergey Pepyakin's avatar
Sergey Pepyakin committed
542
			) -> Vec<InboundDownwardMessage> {
543
544
				self.dmq_contents.get(&recipient).map(|q| q.clone()).unwrap_or_default()
			}
Sergey Pepyakin's avatar
Sergey Pepyakin committed
545
546
547
548
549
550
551

			fn inbound_hrmp_channels_contents(
				&self,
				recipient: ParaId
			) -> BTreeMap<ParaId, Vec<InboundHrmpMessage>> {
				self.hrmp_channels.get(&recipient).map(|q| q.clone()).unwrap_or_default()
			}
552
		}
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584

		impl BabeApi<Block> for MockRuntimeApi {
			fn configuration(&self) -> sp_consensus_babe::BabeGenesisConfiguration {
				unimplemented!()
			}

			fn current_epoch_start(&self) -> sp_consensus_babe::Slot {
				self.babe_epoch.as_ref().unwrap().start_slot
			}

			fn current_epoch(&self) -> BabeEpoch {
				self.babe_epoch.as_ref().unwrap().clone()
			}

			fn next_epoch(&self) -> BabeEpoch {
				unimplemented!()
			}

			fn generate_key_ownership_proof(
				_slot: sp_consensus_babe::Slot,
				_authority_id: sp_consensus_babe::AuthorityId,
			) -> Option<sp_consensus_babe::OpaqueKeyOwnershipProof> {
				None
			}

			fn submit_report_equivocation_unsigned_extrinsic(
				_equivocation_proof: sp_consensus_babe::EquivocationProof<polkadot_primitives::v1::Header>,
				_key_owner_proof: sp_consensus_babe::OpaqueKeyOwnershipProof,
			) -> Option<()> {
				None
			}
		}
585
586
587
588
589
	}

	#[test]
	fn requests_validators() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
590
		let runtime_api = Arc::new(MockRuntimeApi::default());
591
		let relay_parent = [1; 32].into();
592
		let spawner = sp_core::testing::TaskExecutor::new();
593

594
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
595
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(relay_parent, Request::Validators(tx))
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), runtime_api.validators);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

	#[test]
	fn requests_validator_groups() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
614
		let runtime_api = Arc::new(MockRuntimeApi::default());
615
		let relay_parent = [1; 32].into();
616
		let spawner = sp_core::testing::TaskExecutor::new();
617

618
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
619
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(relay_parent, Request::ValidatorGroups(tx))
			}).await;

			assert_eq!(rx.await.unwrap().unwrap().0, runtime_api.validator_groups);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

	#[test]
	fn requests_availability_cores() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
638
		let runtime_api = Arc::new(MockRuntimeApi::default());
639
		let relay_parent = [1; 32].into();
640
		let spawner = sp_core::testing::TaskExecutor::new();
641

642
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
643
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(relay_parent, Request::AvailabilityCores(tx))
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), runtime_api.availability_cores);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

	#[test]
660
	fn requests_persisted_validation_data() {
661
662
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
		let relay_parent = [1; 32].into();
663
664
		let para_a = 5.into();
		let para_b = 6.into();
665
		let spawner = sp_core::testing::TaskExecutor::new();
666

667
668
669
		let mut runtime_api = MockRuntimeApi::default();
		runtime_api.validation_data.insert(para_a, Default::default());
		let runtime_api = Arc::new(runtime_api);
670

671
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
672
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
673
674
675
676
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
677
678
679
680
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::PersistedValidationData(para_a, OccupiedCoreAssumption::Included, tx)
				),
681
682
			}).await;

683
684
685
686
687
688
689
690
691
692
693
			assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));

			let (tx, rx) = oneshot::channel();
			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::PersistedValidationData(para_b, OccupiedCoreAssumption::Included, tx)
				),
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), None);
694
695
696
697
698
699
700

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

701
702
703
704
705
706
707
	#[test]
	fn requests_check_validation_outputs() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
		let mut runtime_api = MockRuntimeApi::default();
		let relay_parent = [1; 32].into();
		let para_a = 5.into();
		let para_b = 6.into();
708
		let commitments = polkadot_primitives::v1::CandidateCommitments::default();
709
		let spawner = sp_core::testing::TaskExecutor::new();
710
711
712
713

		runtime_api.validation_outputs_results.insert(para_a, false);
		runtime_api.validation_outputs_results.insert(para_b, true);

714
715
		let runtime_api = Arc::new(runtime_api);

716
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::CheckValidationOutputs(
						para_a,
						commitments.clone(),
						tx,
					),
				)
			}).await;
			assert_eq!(
				rx.await.unwrap().unwrap(),
				runtime_api.validation_outputs_results[&para_a],
			);

			let (tx, rx) = oneshot::channel();
			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::CheckValidationOutputs(
						para_b,
						commitments,
						tx,
					),
				)
			}).await;
			assert_eq!(
				rx.await.unwrap().unwrap(),
				runtime_api.validation_outputs_results[&para_b],
			);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

758
759
760
	#[test]
	fn requests_session_index_for_child() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
761
		let runtime_api = Arc::new(MockRuntimeApi::default());
762
		let relay_parent = [1; 32].into();
763
		let spawner = sp_core::testing::TaskExecutor::new();
764

765
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
766
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(relay_parent, Request::SessionIndexForChild(tx))
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), runtime_api.session_index_for_child);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

782
783
784
785
786
787
788
	#[test]
	fn requests_session_info() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
		let mut runtime_api = MockRuntimeApi::default();
		let session_index = 1;
		runtime_api.session_info.insert(session_index, Default::default());
		let runtime_api = Arc::new(runtime_api);
789
		let spawner = sp_core::testing::TaskExecutor::new();
790
791
792

		let relay_parent = [1; 32].into();

793
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(relay_parent, Request::SessionInfo(session_index, tx))
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

810
811
812
	#[test]
	fn requests_validation_code() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
813

814
815
816
		let relay_parent = [1; 32].into();
		let para_a = 5.into();
		let para_b = 6.into();
817
		let spawner = sp_core::testing::TaskExecutor::new();
818

819
820
821
		let mut runtime_api = MockRuntimeApi::default();
		runtime_api.validation_code.insert(para_a, Default::default());
		let runtime_api = Arc::new(runtime_api);
822

823
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
824
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::ValidationCode(para_a, OccupiedCoreAssumption::Included, tx)
				),
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));

			let (tx, rx) = oneshot::channel();
			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::ValidationCode(para_b, OccupiedCoreAssumption::Included, tx)
				),
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), None);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

	#[test]
	fn requests_candidate_pending_availability() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
		let relay_parent = [1; 32].into();
		let para_a = 5.into();
		let para_b = 6.into();
859
		let spawner = sp_core::testing::TaskExecutor::new();
860

861
		let mut runtime_api = MockRuntimeApi::default();
862
		runtime_api.candidate_pending_availability.insert(para_a, Default::default());
863
864
		let runtime_api = Arc::new(runtime_api);

865
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
866
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::CandidatePendingAvailability(para_a, tx),
				)
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));

			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(
					relay_parent,
					Request::CandidatePendingAvailability(para_b, tx),
				)
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), None);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

	#[test]
	fn requests_candidate_events() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
899
		let runtime_api = Arc::new(MockRuntimeApi::default());
900
		let relay_parent = [1; 32].into();
901
		let spawner = sp_core::testing::TaskExecutor::new();
902

903
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
904
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
905
906
907
908
909
910
911
912
913
914
915
916
917
918
		let test_task = async move {
			let (tx, rx) = oneshot::channel();

			ctx_handle.send(FromOverseer::Communication {
				msg: RuntimeApiMessage::Request(relay_parent, Request::CandidateEvents(tx))
			}).await;

			assert_eq!(rx.await.unwrap().unwrap(), runtime_api.candidate_events);

			ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
		};

		futures::executor::block_on(future::join(subsystem_task, test_task));
	}
919
920
921
922

	#[test]
	fn requests_dmq_contents() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
923

924
925
926
		let relay_parent = [1; 32].into();
		let para_a = 5.into();
		let para_b = 6.into();
927
		let spawner = sp_core::testing::TaskExecutor::new();
928

929
930
931
932
933
934
935
936
937
938
939
940
941
942
		let runtime_api = Arc::new({
			let mut runtime_api = MockRuntimeApi::default();

			runtime_api.dmq_contents.insert(para_a, vec![]);
			runtime_api.dmq_contents.insert(
				para_b,
				vec![InboundDownwardMessage {
					sent_at: 228,
					msg: b"Novus Ordo Seclorum".to_vec(),
				}],
			);

			runtime_api
		});
943

944
		let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
		let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
		let test_task = async move {
			let (tx, rx) = oneshot::channel();
			ctx_handle
				.send(FromOverseer::Communication {
					msg: RuntimeApiMessage::Request(relay_parent, Request::DmqContents(para_a, tx)),
				})
				.await;
			assert_eq!(rx.await.unwrap().unwrap(), vec![]);

			let (tx, rx) = oneshot::channel();
			ctx_handle
				.send(FromOverseer::Communication {
					msg: RuntimeApiMessage::Request(relay_parent, Request::DmqContents(para_b, tx)),
				})
				.await;
			assert_eq!(
				rx.await.unwrap().unwrap(),
				vec![InboundDownwardMessage {
					sent_at: 228,
					msg: b"Novus Ordo Seclorum".to_vec(),
				}]
			);

			ctx_handle
				.send(FromOverseer::Signal(OverseerSignal::Conclude))
				.await;
		};
		futures::executor::block_on(future::join(subsystem_task, test_task));
	}

Sergey Pepyakin's avatar
Sergey Pepyakin committed
976
977
978
979
980
981
982
983
	#[test]
	fn requests_inbound_hrmp_channels_contents() {
		let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());

		let relay_parent = [1; 32].into();
		let para_a = 99.into();
		let para_b = 66.into();
		let para_c = 33.into();
984
		let spawner = sp_core::testing::TaskExecutor::new();
Sergey Pepyakin's avatar
Sergey Pepyakin committed
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000

		let para_b_inbound_channels = [
			(para_a, vec![]),
			(
				para_c,
				vec![InboundHrmpMessage {
					sent_at: 1,
					data: "𝙀=𝙈𝘾²".as_bytes().to_owned(),
				}],
			),
		]
		.iter()
		.cloned()
		.collect::<BTreeMap<_, _>>();

		let runtime_api = Arc::new({
For faster browsing, not all history is shown. View entire blame