lib.rs 16.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Implements the Runtime API Subsystem
//!
//! This provides a clean, ownerless wrapper around the parachain-related runtime APIs. This crate
//! can also be used to cache responses from heavy runtime APIs.
21

22
23
24
#![deny(unused_crate_dependencies)]
#![warn(missing_docs)]

Shawn Tabrizi's avatar
Shawn Tabrizi committed
25
26
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{Block, BlockId, Hash, ParachainHost};
27
use polkadot_subsystem::{
28
	errors::RuntimeApiError,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
29
30
31
	messages::{RuntimeApiMessage, RuntimeApiRequest as Request},
	overseer, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext, SubsystemError,
	SubsystemResult,
32
33
};

34
use sp_api::ProvideRuntimeApi;
35
use sp_authority_discovery::AuthorityDiscoveryApi;
36
use sp_consensus_babe::BabeApi;
Shawn Tabrizi's avatar
Shawn Tabrizi committed
37
use sp_core::traits::SpawnNamed;
38

39
use cache::{RequestResult, RequestResultCache};
Shawn Tabrizi's avatar
Shawn Tabrizi committed
40
41
use futures::{channel::oneshot, prelude::*, select, stream::FuturesUnordered};
use std::{collections::VecDeque, pin::Pin, sync::Arc};
42
43

mod cache;
44

Andronik Ordian's avatar
Andronik Ordian committed
45
46
47
#[cfg(test)]
mod tests;

48
const LOG_TARGET: &str = "parachain::runtime-api";
49

Denis_P's avatar
Denis_P committed
50
/// The number of maximum runtime API requests can be executed in parallel. Further requests will be buffered.
51
52
const MAX_PARALLEL_REQUESTS: usize = 4;

Denis_P's avatar
Denis_P committed
53
/// The name of the blocking task that executes a runtime API request.
54
55
const API_REQUEST_TASK_NAME: &str = "polkadot-runtime-api-request";

56
/// The `RuntimeApiSubsystem`. See module docs for more details.
57
pub struct RuntimeApiSubsystem<Client> {
58
	client: Arc<Client>,
59
	metrics: Metrics,
60
	spawn_handle: Box<dyn SpawnNamed>,
61
	/// If there are [`MAX_PARALLEL_REQUESTS`] requests being executed, we buffer them in here until they can be executed.
62
63
64
65
	waiting_requests: VecDeque<(
		Pin<Box<dyn Future<Output = ()> + Send>>,
		oneshot::Receiver<Option<RequestResult>>,
	)>,
Denis_P's avatar
Denis_P committed
66
	/// All the active runtime API requests that are currently being executed.
67
68
69
	active_requests: FuturesUnordered<oneshot::Receiver<Option<RequestResult>>>,
	/// Requests results cache
	requests_cache: RequestResultCache,
70
}
71
72

impl<Client> RuntimeApiSubsystem<Client> {
73
	/// Create a new Runtime API subsystem wrapping the given client and metrics.
Shawn Tabrizi's avatar
Shawn Tabrizi committed
74
75
76
77
78
	pub fn new(
		client: Arc<Client>,
		metrics: Metrics,
		spawn_handle: impl SpawnNamed + 'static,
	) -> Self {
79
80
81
82
83
84
		RuntimeApiSubsystem {
			client,
			metrics,
			spawn_handle: Box::new(spawn_handle),
			waiting_requests: Default::default(),
			active_requests: Default::default(),
85
			requests_cache: RequestResultCache::default(),
86
		}
87
88
89
	}
}

Shawn Tabrizi's avatar
Shawn Tabrizi committed
90
91
impl<Client, Context> overseer::Subsystem<Context, SubsystemError> for RuntimeApiSubsystem<Client>
where
92
	Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
93
	Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
94
95
	Context: SubsystemContext<Message = RuntimeApiMessage>,
	Context: overseer::SubsystemContext<Message = RuntimeApiMessage>,
96
97
{
	fn start(self, ctx: Context) -> SpawnedSubsystem {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
98
		SpawnedSubsystem { future: run(ctx, self).boxed(), name: "runtime-api-subsystem" }
99
100
101
	}
}

Shawn Tabrizi's avatar
Shawn Tabrizi committed
102
103
impl<Client> RuntimeApiSubsystem<Client>
where
104
	Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
105
	Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
106
{
107
108
109
110
	fn store_cache(&mut self, result: RequestResult) {
		use RequestResult::*;

		match result {
111
112
			Authorities(relay_parent, authorities) =>
				self.requests_cache.cache_authorities(relay_parent, authorities),
113
114
115
116
117
118
			Validators(relay_parent, validators) =>
				self.requests_cache.cache_validators(relay_parent, validators),
			ValidatorGroups(relay_parent, groups) =>
				self.requests_cache.cache_validator_groups(relay_parent, groups),
			AvailabilityCores(relay_parent, cores) =>
				self.requests_cache.cache_availability_cores(relay_parent, cores),
Shawn Tabrizi's avatar
Shawn Tabrizi committed
119
120
121
			PersistedValidationData(relay_parent, para_id, assumption, data) => self
				.requests_cache
				.cache_persisted_validation_data((relay_parent, para_id, assumption), data),
122
123
124
125
126
127
128
129
130
			AssumedValidationData(
				_relay_parent,
				para_id,
				expected_persisted_validation_data_hash,
				data,
			) => self.requests_cache.cache_assumed_validation_data(
				(para_id, expected_persisted_validation_data_hash),
				data,
			),
Shawn Tabrizi's avatar
Shawn Tabrizi committed
131
132
133
			CheckValidationOutputs(relay_parent, para_id, commitments, b) => self
				.requests_cache
				.cache_check_validation_outputs((relay_parent, para_id, commitments), b),
134
135
			SessionIndexForChild(relay_parent, session_index) =>
				self.requests_cache.cache_session_index_for_child(relay_parent, session_index),
Shawn Tabrizi's avatar
Shawn Tabrizi committed
136
137
138
			ValidationCode(relay_parent, para_id, assumption, code) => self
				.requests_cache
				.cache_validation_code((relay_parent, para_id, assumption), code),
139
140
			ValidationCodeByHash(_relay_parent, validation_code_hash, code) =>
				self.requests_cache.cache_validation_code_by_hash(validation_code_hash, code),
Shawn Tabrizi's avatar
Shawn Tabrizi committed
141
142
143
			CandidatePendingAvailability(relay_parent, para_id, candidate) => self
				.requests_cache
				.cache_candidate_pending_availability((relay_parent, para_id), candidate),
144
145
			CandidateEvents(relay_parent, events) =>
				self.requests_cache.cache_candidate_events(relay_parent, events),
146
147
			SessionInfo(_relay_parent, session_index, info) =>
				self.requests_cache.cache_session_info(session_index, info),
148
149
			DmqContents(relay_parent, para_id, messages) =>
				self.requests_cache.cache_dmq_contents((relay_parent, para_id), messages),
Shawn Tabrizi's avatar
Shawn Tabrizi committed
150
151
152
			InboundHrmpChannelsContents(relay_parent, para_id, contents) => self
				.requests_cache
				.cache_inbound_hrmp_channel_contents((relay_parent, para_id), contents),
153
154
			CurrentBabeEpoch(relay_parent, epoch) =>
				self.requests_cache.cache_current_babe_epoch(relay_parent, epoch),
155
156
			FetchOnChainVotes(relay_parent, scraped) =>
				self.requests_cache.cache_on_chain_votes(relay_parent, scraped),
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
		}
	}

	fn query_cache(&mut self, relay_parent: Hash, request: Request) -> Option<Request> {
		macro_rules! query {
			// Just query by relay parent
			($cache_api_name:ident (), $sender:expr) => {{
				let sender = $sender;
				if let Some(value) = self.requests_cache.$cache_api_name(&relay_parent) {
					let _ = sender.send(Ok(value.clone()));
					self.metrics.on_cached_request();
					None
				} else {
					Some(sender)
				}
			}};
			// Query by relay parent + additional parameters
			($cache_api_name:ident ($($param:expr),+), $sender:expr) => {{
				let sender = $sender;
				if let Some(value) = self.requests_cache.$cache_api_name((relay_parent.clone(), $($param.clone()),+)) {
					self.metrics.on_cached_request();
					let _ = sender.send(Ok(value.clone()));
					None
				} else {
					Some(sender)
				}
			}}
		}

		match request {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
187
188
189
190
191
192
			Request::Authorities(sender) =>
				query!(authorities(), sender).map(|sender| Request::Authorities(sender)),
			Request::Validators(sender) =>
				query!(validators(), sender).map(|sender| Request::Validators(sender)),
			Request::ValidatorGroups(sender) =>
				query!(validator_groups(), sender).map(|sender| Request::ValidatorGroups(sender)),
193
194
195
196
197
			Request::AvailabilityCores(sender) => query!(availability_cores(), sender)
				.map(|sender| Request::AvailabilityCores(sender)),
			Request::PersistedValidationData(para, assumption, sender) =>
				query!(persisted_validation_data(para, assumption), sender)
					.map(|sender| Request::PersistedValidationData(para, assumption, sender)),
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
			Request::AssumedValidationData(
				para,
				expected_persisted_validation_data_hash,
				sender,
			) => query!(
				assumed_validation_data(para, expected_persisted_validation_data_hash),
				sender
			)
			.map(|sender| {
				Request::AssumedValidationData(
					para,
					expected_persisted_validation_data_hash,
					sender,
				)
			}),
213
214
215
			Request::CheckValidationOutputs(para, commitments, sender) =>
				query!(check_validation_outputs(para, commitments), sender)
					.map(|sender| Request::CheckValidationOutputs(para, commitments, sender)),
Shawn Tabrizi's avatar
Shawn Tabrizi committed
216
217
			Request::SessionIndexForChild(sender) => query!(session_index_for_child(), sender)
				.map(|sender| Request::SessionIndexForChild(sender)),
218
219
220
			Request::ValidationCode(para, assumption, sender) =>
				query!(validation_code(para, assumption), sender)
					.map(|sender| Request::ValidationCode(para, assumption, sender)),
221
222
223
			Request::ValidationCodeByHash(validation_code_hash, sender) =>
				query!(validation_code_by_hash(validation_code_hash), sender)
					.map(|sender| Request::ValidationCodeByHash(validation_code_hash, sender)),
224
225
226
			Request::CandidatePendingAvailability(para, sender) =>
				query!(candidate_pending_availability(para), sender)
					.map(|sender| Request::CandidatePendingAvailability(para, sender)),
Shawn Tabrizi's avatar
Shawn Tabrizi committed
227
228
			Request::CandidateEvents(sender) =>
				query!(candidate_events(), sender).map(|sender| Request::CandidateEvents(sender)),
229
230
			Request::SessionInfo(index, sender) => query!(session_info(index), sender)
				.map(|sender| Request::SessionInfo(index, sender)),
Shawn Tabrizi's avatar
Shawn Tabrizi committed
231
232
			Request::DmqContents(id, sender) =>
				query!(dmq_contents(id), sender).map(|sender| Request::DmqContents(id, sender)),
233
234
			Request::InboundHrmpChannelsContents(id, sender) =>
				query!(inbound_hrmp_channels_contents(id), sender)
235
236
					.map(|sender| Request::InboundHrmpChannelsContents(id, sender)),
			Request::CurrentBabeEpoch(sender) =>
Shawn Tabrizi's avatar
Shawn Tabrizi committed
237
				query!(current_babe_epoch(), sender).map(|sender| Request::CurrentBabeEpoch(sender)),
238
239
			Request::FetchOnChainVotes(sender) =>
				query!(on_chain_votes(), sender).map(|sender| Request::FetchOnChainVotes(sender)),
240
241
242
		}
	}

Denis_P's avatar
Denis_P committed
243
	/// Spawn a runtime API request.
244
245
246
247
248
249
250
	///
	/// If there are already [`MAX_PARALLEL_REQUESTS`] requests being executed, the request will be buffered.
	fn spawn_request(&mut self, relay_parent: Hash, request: Request) {
		let client = self.client.clone();
		let metrics = self.metrics.clone();
		let (sender, receiver) = oneshot::channel();

251
252
253
254
255
		let request = match self.query_cache(relay_parent.clone(), request) {
			Some(request) => request,
			None => return,
		};

256
		let request = async move {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
257
			let result = make_runtime_api_request(client, metrics, relay_parent, request);
258
			let _ = sender.send(result);
Shawn Tabrizi's avatar
Shawn Tabrizi committed
259
260
		}
		.boxed();
261
262
263
264
265
266
267

		if self.active_requests.len() >= MAX_PARALLEL_REQUESTS {
			self.waiting_requests.push_back((request, receiver));

			if self.waiting_requests.len() > MAX_PARALLEL_REQUESTS * 10 {
				tracing::warn!(
					target: LOG_TARGET,
Denis_P's avatar
Denis_P committed
268
					"{} runtime API requests waiting to be executed.",
269
270
271
272
					self.waiting_requests.len(),
				)
			}
		} else {
273
274
			self.spawn_handle
				.spawn_blocking(API_REQUEST_TASK_NAME, Some("runtime-api"), request);
275
276
277
278
			self.active_requests.push(receiver);
		}
	}

Denis_P's avatar
Denis_P committed
279
	/// Poll the active runtime API requests.
280
281
282
283
284
285
286
	async fn poll_requests(&mut self) {
		// If there are no active requests, this future should be pending forever.
		if self.active_requests.len() == 0 {
			return futures::pending!()
		}

		// If there are active requests, this will always resolve to `Some(_)` when a request is finished.
287
288
289
		if let Some(Ok(Some(result))) = self.active_requests.next().await {
			self.store_cache(result);
		}
290
291

		if let Some((req, recv)) = self.waiting_requests.pop_front() {
292
293
			self.spawn_handle
				.spawn_blocking(API_REQUEST_TASK_NAME, Some("runtime-api"), req);
294
295
296
297
298
			self.active_requests.push(recv);
		}
	}
}

299
300
async fn run<Client, Context>(
	mut ctx: Context,
301
	mut subsystem: RuntimeApiSubsystem<Client>,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
302
303
) -> SubsystemResult<()>
where
304
	Client: ProvideRuntimeApi<Block> + Send + Sync + 'static,
305
	Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
306
307
	Context: SubsystemContext<Message = RuntimeApiMessage>,
	Context: overseer::SubsystemContext<Message = RuntimeApiMessage>,
308
309
{
	loop {
310
311
312
313
		select! {
			req = ctx.recv().fuse() => match req? {
				FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
				FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {},
314
				FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {},
315
316
317
318
319
320
321
				FromOverseer::Communication { msg } => match msg {
					RuntimeApiMessage::Request(relay_parent, request) => {
						subsystem.spawn_request(relay_parent, request);
					},
				}
			},
			_ = subsystem.poll_requests().fuse() => {},
322
323
324
325
326
		}
	}
}

fn make_runtime_api_request<Client>(
327
328
	client: Arc<Client>,
	metrics: Metrics,
329
330
	relay_parent: Hash,
	request: Request,
331
332
) -> Option<RequestResult>
where
333
	Client: ProvideRuntimeApi<Block>,
334
	Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
335
{
336
337
	let _timer = metrics.time_make_runtime_api_request();

338
	macro_rules! query {
thiolliere's avatar
thiolliere committed
339
		($req_variant:ident, $api_name:ident ($($param:expr),*), $sender:expr) => {{
340
341
			let sender = $sender;
			let api = client.runtime_api();
thiolliere's avatar
thiolliere committed
342
			let res = api.$api_name(&BlockId::Hash(relay_parent) $(, $param.clone() )*)
343
				.map_err(|e| RuntimeApiError::from(format!("{:?}", e)));
344
			metrics.on_request(res.is_ok());
345
346
			let _ = sender.send(res.clone());

thiolliere's avatar
thiolliere committed
347
			res.ok().map(|res| RequestResult::$req_variant(relay_parent, $( $param, )* res))
348
349
350
351
		}}
	}

	match request {
352
		Request::Authorities(sender) => query!(Authorities, authorities(), sender),
353
354
		Request::Validators(sender) => query!(Validators, validators(), sender),
		Request::ValidatorGroups(sender) => query!(ValidatorGroups, validator_groups(), sender),
Shawn Tabrizi's avatar
Shawn Tabrizi committed
355
356
		Request::AvailabilityCores(sender) =>
			query!(AvailabilityCores, availability_cores(), sender),
357
		Request::PersistedValidationData(para, assumption, sender) =>
358
			query!(PersistedValidationData, persisted_validation_data(para, assumption), sender),
359
360
361
362
363
364
		Request::AssumedValidationData(para, expected_persisted_validation_data_hash, sender) =>
			query!(
				AssumedValidationData,
				assumed_validation_data(para, expected_persisted_validation_data_hash),
				sender
			),
365
		Request::CheckValidationOutputs(para, commitments, sender) =>
366
			query!(CheckValidationOutputs, check_validation_outputs(para, commitments), sender),
Shawn Tabrizi's avatar
Shawn Tabrizi committed
367
368
		Request::SessionIndexForChild(sender) =>
			query!(SessionIndexForChild, session_index_for_child(), sender),
369
		Request::ValidationCode(para, assumption, sender) =>
370
			query!(ValidationCode, validation_code(para, assumption), sender),
371
372
		Request::ValidationCodeByHash(validation_code_hash, sender) =>
			query!(ValidationCodeByHash, validation_code_by_hash(validation_code_hash), sender),
373
		Request::CandidatePendingAvailability(para, sender) =>
374
375
376
377
			query!(CandidatePendingAvailability, candidate_pending_availability(para), sender),
		Request::CandidateEvents(sender) => query!(CandidateEvents, candidate_events(), sender),
		Request::SessionInfo(index, sender) => query!(SessionInfo, session_info(index), sender),
		Request::DmqContents(id, sender) => query!(DmqContents, dmq_contents(id), sender),
Shawn Tabrizi's avatar
Shawn Tabrizi committed
378
379
		Request::InboundHrmpChannelsContents(id, sender) =>
			query!(InboundHrmpChannelsContents, inbound_hrmp_channels_contents(id), sender),
380
		Request::CurrentBabeEpoch(sender) => query!(CurrentBabeEpoch, current_epoch(), sender),
381
		Request::FetchOnChainVotes(sender) => query!(FetchOnChainVotes, on_chain_votes(), sender),
382
383
384
	}
}

385
386
387
#[derive(Clone)]
struct MetricsInner {
	chain_api_requests: prometheus::CounterVec<prometheus::U64>,
388
	make_runtime_api_request: prometheus::Histogram,
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
}

/// Runtime API metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_request(&self, succeeded: bool) {
		if let Some(metrics) = &self.0 {
			if succeeded {
				metrics.chain_api_requests.with_label_values(&["succeeded"]).inc();
			} else {
				metrics.chain_api_requests.with_label_values(&["failed"]).inc();
			}
		}
	}
405

406
	fn on_cached_request(&self) {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
407
408
		self.0
			.as_ref()
409
410
411
			.map(|metrics| metrics.chain_api_requests.with_label_values(&["cached"]).inc());
	}

412
	/// Provide a timer for `make_runtime_api_request` which observes on drop.
Shawn Tabrizi's avatar
Shawn Tabrizi committed
413
414
415
	fn time_make_runtime_api_request(
		&self,
	) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
416
417
		self.0.as_ref().map(|metrics| metrics.make_runtime_api_request.start_timer())
	}
418
419
420
421
422
423
424
425
426
427
428
}

impl metrics::Metrics for Metrics {
	fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
		let metrics = MetricsInner {
			chain_api_requests: prometheus::register(
				prometheus::CounterVec::new(
					prometheus::Opts::new(
						"parachain_runtime_api_requests_total",
						"Number of Runtime API requests served.",
					),
429
					&["success"],
430
431
432
				)?,
				registry,
			)?,
433
			make_runtime_api_request: prometheus::register(
Shawn Tabrizi's avatar
Shawn Tabrizi committed
434
435
436
437
				prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
					"parachain_runtime_api_make_runtime_api_request",
					"Time spent within `runtime_api::make_runtime_api_request`",
				))?,
438
439
				registry,
			)?,
440
441
442
443
		};
		Ok(Metrics(Some(metrics)))
	}
}