lib.rs 15.2 KB
Newer Older
Andronik Ordian's avatar
Andronik Ordian committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Implements the Chain API Subsystem
//!
//! Provides access to the chain data. Every request may return an error.
//! At the moment, the implementation requires `Client` to implement `HeaderBackend`,
//! we may add more bounds in the future if we will need e.g. block bodies.
//!
//! Supported requests:
//! * Block hash to number
25
//! * Block hash to header
Andronik Ordian's avatar
Andronik Ordian committed
26
27
28
29
//! * Finalized block number to hash
//! * Last finalized block number
//! * Ancestors

30
31
32
#![deny(unused_crate_dependencies, unused_results)]
#![warn(missing_docs)]

Andronik Ordian's avatar
Andronik Ordian committed
33
34
use polkadot_subsystem::{
	FromOverseer, OverseerSignal,
35
	SpawnedSubsystem, Subsystem, SubsystemResult, SubsystemError, SubsystemContext,
Andronik Ordian's avatar
Andronik Ordian committed
36
	messages::ChainApiMessage,
37
38
};
use polkadot_node_subsystem_util::{
39
	metrics::{self, prometheus},
Andronik Ordian's avatar
Andronik Ordian committed
40
41
42
};
use polkadot_primitives::v1::{Block, BlockId};
use sp_blockchain::HeaderBackend;
43
use std::sync::Arc;
Andronik Ordian's avatar
Andronik Ordian committed
44
45
46

use futures::prelude::*;

47
const LOG_TARGET: &str = "parachain::chain-api";
48

Andronik Ordian's avatar
Andronik Ordian committed
49
50
/// The Chain API Subsystem implementation.
pub struct ChainApiSubsystem<Client> {
51
	client: Arc<Client>,
52
	metrics: Metrics,
Andronik Ordian's avatar
Andronik Ordian committed
53
54
55
56
}

impl<Client> ChainApiSubsystem<Client> {
	/// Create a new Chain API subsystem with the given client.
57
	pub fn new(client: Arc<Client>, metrics: Metrics) -> Self {
Andronik Ordian's avatar
Andronik Ordian committed
58
		ChainApiSubsystem {
59
60
			client,
			metrics,
Andronik Ordian's avatar
Andronik Ordian committed
61
62
63
64
65
66
67
68
69
		}
	}
}

impl<Client, Context> Subsystem<Context> for ChainApiSubsystem<Client> where
	Client: HeaderBackend<Block> + 'static,
	Context: SubsystemContext<Message = ChainApiMessage>
{
	fn start(self, ctx: Context) -> SpawnedSubsystem {
70
71
72
		let future = run(ctx, self)
			.map_err(|e| SubsystemError::with_origin("chain-api", e))
			.boxed();
Andronik Ordian's avatar
Andronik Ordian committed
73
		SpawnedSubsystem {
74
			future,
Andronik Ordian's avatar
Andronik Ordian committed
75
76
77
78
79
			name: "chain-api-subsystem",
		}
	}
}

80
#[tracing::instrument(skip(ctx, subsystem), fields(subsystem = LOG_TARGET))]
Andronik Ordian's avatar
Andronik Ordian committed
81
82
async fn run<Client>(
	mut ctx: impl SubsystemContext<Message = ChainApiMessage>,
83
	subsystem: ChainApiSubsystem<Client>,
Andronik Ordian's avatar
Andronik Ordian committed
84
85
86
87
88
89
90
91
) -> SubsystemResult<()>
where
	Client: HeaderBackend<Block>,
{
	loop {
		match ctx.recv().await? {
			FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
			FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {},
92
			FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {},
Andronik Ordian's avatar
Andronik Ordian committed
93
94
			FromOverseer::Communication { msg } => match msg {
				ChainApiMessage::BlockNumber(hash, response_channel) => {
95
					let _timer = subsystem.metrics.time_block_number();
96
97
					let result = subsystem.client.number(hash).map_err(|e| e.to_string().into());
					subsystem.metrics.on_request(result.is_ok());
Andronik Ordian's avatar
Andronik Ordian committed
98
99
					let _ = response_channel.send(result);
				},
100
				ChainApiMessage::BlockHeader(hash, response_channel) => {
101
					let _timer = subsystem.metrics.time_block_header();
102
103
104
105
106
107
					let result = subsystem.client
						.header(BlockId::Hash(hash))
						.map_err(|e| e.to_string().into());
					subsystem.metrics.on_request(result.is_ok());
					let _ = response_channel.send(result);
				},
Andronik Ordian's avatar
Andronik Ordian committed
108
				ChainApiMessage::FinalizedBlockHash(number, response_channel) => {
109
					let _timer = subsystem.metrics.time_finalized_block_hash();
Andronik Ordian's avatar
Andronik Ordian committed
110
					// Note: we don't verify it's finalized
111
112
					let result = subsystem.client.hash(number).map_err(|e| e.to_string().into());
					subsystem.metrics.on_request(result.is_ok());
Andronik Ordian's avatar
Andronik Ordian committed
113
114
115
					let _ = response_channel.send(result);
				},
				ChainApiMessage::FinalizedBlockNumber(response_channel) => {
116
					let _timer = subsystem.metrics.time_finalized_block_number();
117
118
119
					let result = subsystem.client.info().finalized_number;
					// always succeeds
					subsystem.metrics.on_request(true);
Andronik Ordian's avatar
Andronik Ordian committed
120
121
122
					let _ = response_channel.send(Ok(result));
				},
				ChainApiMessage::Ancestors { hash, k, response_channel } => {
123
					let _timer = subsystem.metrics.time_ancestors();
124
					tracing::span!(tracing::Level::TRACE, "ChainApiMessage::Ancestors", subsystem=LOG_TARGET, hash=%hash, k=k);
125

Andronik Ordian's avatar
Andronik Ordian committed
126
127
128
					let mut hash = hash;

					let next_parent = core::iter::from_fn(|| {
129
						let maybe_header = subsystem.client.header(BlockId::Hash(hash));
Andronik Ordian's avatar
Andronik Ordian committed
130
131
						match maybe_header {
							// propagate the error
132
133
134
135
							Err(e) => {
								let e = e.to_string().into();
								Some(Err(e))
							},
Andronik Ordian's avatar
Andronik Ordian committed
136
137
138
							// fewer than `k` ancestors are available
							Ok(None) => None,
							Ok(Some(header)) => {
139
140
141
142
143
144
145
								// stop at the genesis header.
								if header.number == 1 {
									None
								} else {
									hash = header.parent_hash;
									Some(Ok(hash))
								}
Andronik Ordian's avatar
Andronik Ordian committed
146
147
148
149
150
							}
						}
					});

					let result = next_parent.take(k).collect::<Result<Vec<_>, _>>();
151
					subsystem.metrics.on_request(result.is_ok());
Andronik Ordian's avatar
Andronik Ordian committed
152
153
154
155
156
157
158
					let _ = response_channel.send(result);
				},
			}
		}
	}
}

159
160
161
#[derive(Clone)]
struct MetricsInner {
	chain_api_requests: prometheus::CounterVec<prometheus::U64>,
162
163
164
165
166
	block_number: prometheus::Histogram,
	block_header: prometheus::Histogram,
	finalized_block_hash: prometheus::Histogram,
	finalized_block_number: prometheus::Histogram,
	ancestors: prometheus::Histogram,
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
}

/// Chain API metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_request(&self, succeeded: bool) {
		if let Some(metrics) = &self.0 {
			if succeeded {
				metrics.chain_api_requests.with_label_values(&["succeeded"]).inc();
			} else {
				metrics.chain_api_requests.with_label_values(&["failed"]).inc();
			}
		}
	}
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207

	/// Provide a timer for `block_number` which observes on drop.
	fn time_block_number(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.block_number.start_timer())
	}

	/// Provide a timer for `block_header` which observes on drop.
	fn time_block_header(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.block_header.start_timer())
	}

	/// Provide a timer for `finalized_block_hash` which observes on drop.
	fn time_finalized_block_hash(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.finalized_block_hash.start_timer())
	}

	/// Provide a timer for `finalized_block_number` which observes on drop.
	fn time_finalized_block_number(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.finalized_block_number.start_timer())
	}

	/// Provide a timer for `ancestors` which observes on drop.
	fn time_ancestors(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.ancestors.start_timer())
	}
208
209
210
211
212
213
214
215
216
217
218
}

impl metrics::Metrics for Metrics {
	fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
		let metrics = MetricsInner {
			chain_api_requests: prometheus::register(
				prometheus::CounterVec::new(
					prometheus::Opts::new(
						"parachain_chain_api_requests_total",
						"Number of Chain API requests served.",
					),
219
					&["success"],
220
221
222
				)?,
				registry,
			)?,
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
			block_number: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_chain_api_block_number",
						"Time spent within `chain_api::block_number`",
					)
				)?,
				registry,
			)?,
			block_header: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_chain_api_block_headers",
						"Time spent within `chain_api::block_headers`",
					)
				)?,
				registry,
			)?,
			finalized_block_hash: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_chain_api_finalized_block_hash",
						"Time spent within `chain_api::finalized_block_hash`",
					)
				)?,
				registry,
			)?,
			finalized_block_number: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_chain_api_finalized_block_number",
						"Time spent within `chain_api::finalized_block_number`",
					)
				)?,
				registry,
			)?,
			ancestors: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_chain_api_ancestors",
						"Time spent within `chain_api::ancestors`",
					)
				)?,
				registry,
			)?,
268
269
270
271
272
273
		};
		Ok(Metrics(Some(metrics)))
	}
}


Andronik Ordian's avatar
Andronik Ordian committed
274
275
276
277
278
279
280
281
#[cfg(test)]
mod tests {
	use super::*;

	use std::collections::BTreeMap;
	use futures::{future::BoxFuture, channel::oneshot};

	use polkadot_primitives::v1::{Hash, BlockNumber, BlockId, Header};
282
	use polkadot_node_subsystem_test_helpers::{make_subsystem_context, TestSubsystemContextHandle};
Andronik Ordian's avatar
Andronik Ordian committed
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
	use sp_blockchain::Info as BlockInfo;
	use sp_core::testing::TaskExecutor;

	#[derive(Clone)]
	struct TestClient {
		blocks: BTreeMap<Hash, BlockNumber>,
		finalized_blocks: BTreeMap<BlockNumber, Hash>,
		headers: BTreeMap<Hash, Header>,
	}

	const ONE: Hash = Hash::repeat_byte(0x01);
	const TWO: Hash = Hash::repeat_byte(0x02);
	const THREE: Hash = Hash::repeat_byte(0x03);
	const FOUR: Hash = Hash::repeat_byte(0x04);
	const ERROR_PATH: Hash = Hash::repeat_byte(0xFF);

	fn default_header() -> Header {
		Header {
			parent_hash: Hash::zero(),
			number: 100500,
303
304
305
			state_root: Hash::zero(),
			extrinsics_root: Hash::zero(),
			digest: Default::default(),
Andronik Ordian's avatar
Andronik Ordian committed
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
		}
	}

	impl Default for TestClient {
		fn default() -> Self {
			Self {
				blocks: maplit::btreemap! {
					ONE => 1,
					TWO => 2,
					THREE => 3,
					FOUR => 4,
				},
				finalized_blocks: maplit::btreemap! {
					1 => ONE,
					3 => THREE,
				},
				headers: maplit::btreemap! {
					TWO => Header {
						parent_hash: ONE,
						number: 2,
						..default_header()
					},
					THREE => Header {
						parent_hash: TWO,
						number: 3,
						..default_header()
					},
					FOUR => Header {
						parent_hash: THREE,
						number: 4,
						..default_header()
					},
					ERROR_PATH => Header {
						..default_header()
					}
				}
			}
		}
	}

	fn last_key_value<K: Clone, V: Clone>(map: &BTreeMap<K, V>) -> (K, V) {
		assert!(!map.is_empty());
		map.iter()
			.last()
			.map(|(k, v)| (k.clone(), v.clone()))
			.unwrap()
	}

	impl HeaderBackend<Block> for TestClient {
		fn info(&self) -> BlockInfo<Block> {
			let genesis_hash = self.blocks.iter().next().map(|(h, _)| *h).unwrap();
			let (best_hash, best_number) = last_key_value(&self.blocks);
			let (finalized_number, finalized_hash) = last_key_value(&self.finalized_blocks);

			BlockInfo {
				best_hash,
				best_number,
				genesis_hash,
				finalized_hash,
				finalized_number,
				number_leaves: 0,
			}
		}
		fn number(&self, hash: Hash) -> sp_blockchain::Result<Option<BlockNumber>> {
			Ok(self.blocks.get(&hash).copied())
		}
		fn hash(&self, number: BlockNumber) -> sp_blockchain::Result<Option<Hash>> {
			Ok(self.finalized_blocks.get(&number).copied())
		}
		fn header(&self, id: BlockId) -> sp_blockchain::Result<Option<Header>> {
			match id {
				// for error path testing
				BlockId::Hash(hash) if hash.is_zero()  => {
					Err(sp_blockchain::Error::Backend("Zero hashes are illegal!".into()))
				}
				BlockId::Hash(hash) => {
					Ok(self.headers.get(&hash).cloned())
				}
				_ => unreachable!(),
			}
		}
		fn status(&self, _id: BlockId) -> sp_blockchain::Result<sp_blockchain::BlockStatus> {
			unimplemented!()
		}
	}

	fn test_harness(
393
		test: impl FnOnce(Arc<TestClient>, TestSubsystemContextHandle<ChainApiMessage>)
Andronik Ordian's avatar
Andronik Ordian committed
394
395
396
			-> BoxFuture<'static, ()>,
	) {
		let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new());
397
		let client = Arc::new(TestClient::default());
Andronik Ordian's avatar
Andronik Ordian committed
398

399
400
		let subsystem = ChainApiSubsystem::new(client.clone(), Metrics(None));
		let chain_api_task = run(ctx, subsystem).map(|x| x.unwrap());
Andronik Ordian's avatar
Andronik Ordian committed
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
		let test_task = test(client, ctx_handle);

		futures::executor::block_on(future::join(chain_api_task, test_task));
	}

	#[test]
	fn request_block_number() {
		test_harness(|client, mut sender| {
			async move {
				let zero = Hash::zero();
				let test_cases = [
					(TWO, client.number(TWO).unwrap()),
					(zero, client.number(zero).unwrap()), // not here
				];
				for (hash, expected) in &test_cases {
					let (tx, rx) = oneshot::channel();

					sender.send(FromOverseer::Communication {
						msg: ChainApiMessage::BlockNumber(*hash, tx),
					}).await;

422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
					assert_eq!(rx.await.unwrap().unwrap(), *expected);
				}

				sender.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
			}.boxed()
		})
	}

	#[test]
	fn request_block_header() {
		test_harness(|client, mut sender| {
			async move {
				const NOT_HERE: Hash = Hash::repeat_byte(0x5);
				let test_cases = [
					(TWO, client.header(BlockId::Hash(TWO)).unwrap()),
					(NOT_HERE, client.header(BlockId::Hash(NOT_HERE)).unwrap()),
				];
				for (hash, expected) in &test_cases {
					let (tx, rx) = oneshot::channel();

					sender.send(FromOverseer::Communication {
						msg: ChainApiMessage::BlockHeader(*hash, tx),
					}).await;

Andronik Ordian's avatar
Andronik Ordian committed
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
					assert_eq!(rx.await.unwrap().unwrap(), *expected);
				}

				sender.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
			}.boxed()
		})
	}

	#[test]
	fn request_finalized_hash() {
		test_harness(|client, mut sender| {
			async move {
				let test_cases = [
					(1, client.hash(1).unwrap()), // not here
					(2, client.hash(2).unwrap()),
				];
				for (number, expected) in &test_cases {
					let (tx, rx) = oneshot::channel();

					sender.send(FromOverseer::Communication {
						msg: ChainApiMessage::FinalizedBlockHash(*number, tx),
					}).await;

					assert_eq!(rx.await.unwrap().unwrap(), *expected);
				}

				sender.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
			}.boxed()
		})
	}

	#[test]
	fn request_last_finalized_number() {
		test_harness(|client, mut sender| {
			async move {
				let (tx, rx) = oneshot::channel();

				let expected = client.info().finalized_number;
				sender.send(FromOverseer::Communication {
					msg: ChainApiMessage::FinalizedBlockNumber(tx),
				}).await;

				assert_eq!(rx.await.unwrap().unwrap(), expected);

				sender.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
			}.boxed()
		})
	}

	#[test]
	fn request_ancestors() {
		test_harness(|_client, mut sender| {
			async move {
				let (tx, rx) = oneshot::channel();
				sender.send(FromOverseer::Communication {
					msg: ChainApiMessage::Ancestors { hash: THREE, k: 4, response_channel: tx },
				}).await;
				assert_eq!(rx.await.unwrap().unwrap(), vec![TWO, ONE]);

				let (tx, rx) = oneshot::channel();
				sender.send(FromOverseer::Communication {
					msg: ChainApiMessage::Ancestors { hash: TWO, k: 1, response_channel: tx },
				}).await;
				assert_eq!(rx.await.unwrap().unwrap(), vec![ONE]);

				let (tx, rx) = oneshot::channel();
				sender.send(FromOverseer::Communication {
					msg: ChainApiMessage::Ancestors { hash: ERROR_PATH, k: 2, response_channel: tx },
				}).await;
				assert!(rx.await.unwrap().is_err());

				sender.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
			}.boxed()
		})
	}
}