lib.rs 66 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! The Approval Voting Subsystem.
//!
//! This subsystem is responsible for determining candidates to do approval checks
//! on, performing those approval checks, and tracking the assignments and approvals
//! of others. It uses this information to determine when candidates and blocks have
//! been sufficiently approved to finalize.

24
use polkadot_node_subsystem::{
25
	messages::{
26
27
		AssignmentCheckError, AssignmentCheckResult, ApprovalCheckError, ApprovalCheckResult,
		ApprovalVotingMessage, RuntimeApiMessage, RuntimeApiRequest, ChainApiMessage,
28
		ApprovalDistributionMessage, CandidateValidationMessage,
29
		AvailabilityRecoveryMessage,
30
31
32
	},
	errors::RecoveryError,
	Subsystem, SubsystemContext, SubsystemError, SubsystemResult, SpawnedSubsystem,
33
	FromOverseer, OverseerSignal, SubsystemSender,
34
};
35
use polkadot_node_subsystem_util::{
36
	TimeoutExt,
37
	metrics::{self, prometheus},
38
	rolling_session_window::RollingSessionWindow,
39
};
40
41
use polkadot_primitives::v1::{
	ValidatorIndex, Hash, SessionIndex, SessionInfo, CandidateHash,
42
43
	CandidateReceipt, BlockNumber,
	ValidatorPair, ValidatorSignature, ValidatorId,
44
	CandidateIndex, GroupIndex, ApprovalVote,
45
};
46
use polkadot_node_primitives::ValidationResult;
47
use polkadot_node_primitives::approval::{
48
	IndirectAssignmentCert, IndirectSignedApprovalVote, DelayTranche, BlockApprovalMeta,
49
};
50
use polkadot_node_jaeger as jaeger;
51
use sc_keystore::LocalKeystore;
52
use sp_consensus::SyncOracle;
53
54
55
use sp_consensus_slots::Slot;
use sp_runtime::traits::AppVerify;
use sp_application_crypto::Pair;
56
use kvdb::KeyValueDB;
57

58
use futures::prelude::*;
59
60
61
use futures::future::{BoxFuture, RemoteHandle};
use futures::channel::oneshot;
use futures::stream::FuturesUnordered;
62

63
use std::collections::{BTreeMap, HashMap, HashSet};
64
65
use std::collections::btree_map::Entry;
use std::sync::Arc;
66
use std::time::Duration;
67
68
69
70
71
72
73
74
75
76
77
78
79

use approval_checking::RequiredTranches;
use persisted_entries::{ApprovalEntry, CandidateEntry, BlockEntry};
use criteria::{AssignmentCriteria, RealAssignmentCriteria};
use time::{slot_number_to_tick, Tick, Clock, ClockExt, SystemClock};

mod approval_checking;
mod approval_db;
mod criteria;
mod import;
mod time;
mod persisted_entries;

80
81
use crate::approval_db::v1::Config as DatabaseConfig;

82
83
84
85
#[cfg(test)]
mod tests;

const APPROVAL_SESSIONS: SessionIndex = 6;
86
87
const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120);
const APPROVAL_CACHE_SIZE: usize = 1024;
88
const LOG_TARGET: &str = "parachain::approval-voting";
89

90
/// Configuration for the approval voting subsystem
91
#[derive(Debug, Clone)]
92
pub struct Config {
93
94
	/// The column family in the DB where approval-voting data is stored.
	pub col_data: u32,
95
96
97
98
99
	/// The slot duration of the consensus algorithm, in milliseconds. Should be evenly
	/// divisible by 500.
	pub slot_duration_millis: u64,
}

100
101
102
103
104
105
106
107
108
109
110
111
112
113
// The mode of the approval voting subsystem. It should start in a `Syncing` mode when it first
// starts, and then once it's reached the head of the chain it should move into the `Active` mode.
//
// In `Active` mode, the node is an active participant in the approvals protocol. When syncing,
// the node follows the new incoming blocks and finalized number, but does not yet participate.
//
// When transitioning from `Syncing` to `Active`, the node notifies the `ApprovalDistribution`
// subsystem of all unfinalized blocks and the candidates included within them, as well as all
// votes that the local node itself has cast on candidates within those blocks.
enum Mode {
	Active,
	Syncing(Box<dyn SyncOracle + Send>),
}

114
/// The approval voting subsystem.
115
pub struct ApprovalVotingSubsystem {
116
117
118
	/// LocalKeystore is needed for assignment keys, but not necessarily approval keys.
	///
	/// We do a lot of VRF signing and need the keys to have low latency.
119
	keystore: Arc<LocalKeystore>,
120
	db_config: DatabaseConfig,
121
	slot_duration_millis: u64,
122
	db: Arc<dyn KeyValueDB>,
123
	mode: Mode,
124
125
126
127
128
129
	metrics: Metrics,
}

#[derive(Clone)]
struct MetricsInner {
	imported_candidates_total: prometheus::Counter<prometheus::U64>,
130
131
	assignments_produced: prometheus::Histogram,
	approvals_produced_total: prometheus::CounterVec<prometheus::U64>,
132
133
134
135
	no_shows_total: prometheus::Counter<prometheus::U64>,
	wakeups_triggered_total: prometheus::Counter<prometheus::U64>,
	candidate_approval_time_ticks: prometheus::Histogram,
	block_approval_time_ticks: prometheus::Histogram,
136
	time_db_transaction: prometheus::Histogram,
137
	time_recover_and_approve: prometheus::Histogram,
138
139
140
141
142
143
144
145
146
147
148
149
150
}

/// Aproval Voting metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_candidate_imported(&self) {
		if let Some(metrics) = &self.0 {
			metrics.imported_candidates_total.inc();
		}
	}

151
	fn on_assignment_produced(&self, tranche: DelayTranche) {
152
		if let Some(metrics) = &self.0 {
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
			metrics.assignments_produced.observe(tranche as f64);
		}
	}

	fn on_approval_stale(&self) {
		if let Some(metrics) = &self.0 {
			metrics.approvals_produced_total.with_label_values(&["stale"]).inc()
		}
	}

	fn on_approval_invalid(&self) {
		if let Some(metrics) = &self.0 {
			metrics.approvals_produced_total.with_label_values(&["invalid"]).inc()
		}
	}

	fn on_approval_unavailable(&self) {
		if let Some(metrics) = &self.0 {
			metrics.approvals_produced_total.with_label_values(&["unavailable"]).inc()
		}
	}

	fn on_approval_error(&self) {
		if let Some(metrics) = &self.0 {
			metrics.approvals_produced_total.with_label_values(&["internal error"]).inc()
178
179
180
181
182
		}
	}

	fn on_approval_produced(&self) {
		if let Some(metrics) = &self.0 {
183
			metrics.approvals_produced_total.with_label_values(&["success"]).inc()
184
185
		}
	}
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209

	fn on_no_shows(&self, n: usize) {
		if let Some(metrics) = &self.0 {
			metrics.no_shows_total.inc_by(n as u64);
		}
	}

	fn on_wakeup(&self) {
		if let Some(metrics) = &self.0 {
			metrics.wakeups_triggered_total.inc();
		}
	}

	fn on_candidate_approved(&self, ticks: Tick) {
		if let Some(metrics) = &self.0 {
			metrics.candidate_approval_time_ticks.observe(ticks as f64);
		}
	}

	fn on_block_approved(&self, ticks: Tick) {
		if let Some(metrics) = &self.0 {
			metrics.block_approval_time_ticks.observe(ticks as f64);
		}
	}
210
211
212
213

	fn time_db_transaction(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.time_db_transaction.start_timer())
	}
214
215
216
217

	fn time_recover_and_approve(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.time_recover_and_approve.start_timer())
	}
218
219
220
221
222
223
224
225
226
227
228
229
230
231
}

impl metrics::Metrics for Metrics {
	fn try_register(
		registry: &prometheus::Registry,
	) -> std::result::Result<Self, prometheus::PrometheusError> {
		let metrics = MetricsInner {
			imported_candidates_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_imported_candidates_total",
					"Number of candidates imported by the approval voting subsystem",
				)?,
				registry,
			)?,
232
233
234
235
236
237
			assignments_produced: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_assignments_produced",
						"Assignments and tranches produced by the approval voting subsystem",
					).buckets(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 15.0, 25.0, 40.0, 70.0]),
238
239
240
241
				)?,
				registry,
			)?,
			approvals_produced_total: prometheus::register(
242
243
244
245
246
247
				prometheus::CounterVec::new(
					prometheus::Opts::new(
						"parachain_approvals_produced_total",
						"Number of approvals produced by the approval voting subsystem",
					),
					&["status"]
248
249
250
				)?,
				registry,
			)?,
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
			no_shows_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_approvals_no_shows_total",
					"Number of assignments which became no-shows in the approval voting subsystem",
				)?,
				registry,
			)?,
			wakeups_triggered_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_approvals_wakeups_total",
					"Number of times we woke up to process a candidate in the approval voting subsystem",
				)?,
				registry,
			)?,
			candidate_approval_time_ticks: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_approvals_candidate_approval_time_ticks",
						"Number of ticks (500ms) to approve candidates.",
					).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
				)?,
				registry,
			)?,
			block_approval_time_ticks: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_approvals_blockapproval_time_ticks",
						"Number of ticks (500ms) to approve blocks.",
					).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
				)?,
				registry,
			)?,
283
284
285
286
287
288
289
290
291
			time_db_transaction: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_time_approval_db_transaction",
						"Time spent writing an approval db transaction.",
					)
				)?,
				registry,
			)?,
292
293
294
295
296
297
298
299
300
			time_recover_and_approve: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_time_recover_and_approve",
						"Time spent recovering and approving data in approval voting",
					)
				)?,
				registry,
			)?,
301
		};
302

303
304
		Ok(Metrics(Some(metrics)))
	}
305
306
}

307
impl ApprovalVotingSubsystem {
308
	/// Create a new approval voting subsystem with the given keystore, config, and database.
309
310
	pub fn with_config(
		config: Config,
311
		db: Arc<dyn KeyValueDB>,
312
		keystore: Arc<LocalKeystore>,
313
		sync_oracle: Box<dyn SyncOracle + Send>,
314
		metrics: Metrics,
315
316
	) -> Self {
		ApprovalVotingSubsystem {
317
			keystore,
318
			slot_duration_millis: config.slot_duration_millis,
319
			db,
320
321
322
323
			db_config: DatabaseConfig {
				col_data: config.col_data,
			},
			mode: Mode::Syncing(sync_oracle),
324
			metrics,
325
		}
326
327
328
	}
}

329
330
331
impl<C> Subsystem<C> for ApprovalVotingSubsystem
	where C: SubsystemContext<Message = ApprovalVotingMessage>
{
332
	fn start(self, ctx: C) -> SpawnedSubsystem {
333
		let future = run::<C>(
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
			ctx,
			self,
			Box::new(SystemClock),
			Box::new(RealAssignmentCriteria),
		)
			.map_err(|e| SubsystemError::with_origin("approval-voting", e))
			.boxed();

		SpawnedSubsystem {
			name: "approval-voting-subsystem",
			future,
		}
	}
}

349
#[derive(Debug, Clone)]
350
351
352
353
354
355
356
357
358
359
struct ApprovalVoteRequest {
	validator_index: ValidatorIndex,
	block_hash: Hash,
}

#[derive(Default)]
struct Wakeups {
	// Tick -> [(Relay Block, Candidate Hash)]
	wakeups: BTreeMap<Tick, Vec<(Hash, CandidateHash)>>,
	reverse_wakeups: HashMap<(Hash, CandidateHash), Tick>,
360
	block_numbers: BTreeMap<BlockNumber, HashSet<Hash>>,
361
362
363
364
365
366
367
368
}

impl Wakeups {
	// Returns the first tick there exist wakeups for, if any.
	fn first(&self) -> Option<Tick> {
		self.wakeups.keys().next().map(|t| *t)
	}

369
370
371
372
	fn note_block(&mut self, block_hash: Hash, block_number: BlockNumber) {
		self.block_numbers.entry(block_number).or_default().insert(block_hash);
	}

373
374
	// Schedules a wakeup at the given tick. no-op if there is already an earlier or equal wake-up
	// for these values. replaces any later wakeup.
375
376
377
378
379
380
381
	fn schedule(
		&mut self,
		block_hash: Hash,
		block_number: BlockNumber,
		candidate_hash: CandidateHash,
		tick: Tick,
	) {
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
		if let Some(prev) = self.reverse_wakeups.get(&(block_hash, candidate_hash)) {
			if prev <= &tick { return }

			// we are replacing previous wakeup with an earlier one.
			if let Entry::Occupied(mut entry) = self.wakeups.entry(*prev) {
				if let Some(pos) = entry.get().iter()
					.position(|x| x == &(block_hash, candidate_hash))
				{
					entry.get_mut().remove(pos);
				}

				if entry.get().is_empty() {
					let _ = entry.remove_entry();
				}
			}
397
398
		} else {
			self.note_block(block_hash, block_number);
399
400
401
402
403
404
		}

		self.reverse_wakeups.insert((block_hash, candidate_hash), tick);
		self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash));
	}

405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
	fn prune_finalized_wakeups(&mut self, finalized_number: BlockNumber) {
		let after = self.block_numbers.split_off(&(finalized_number + 1));
		let pruned_blocks: HashSet<_> = std::mem::replace(&mut self.block_numbers, after)
			.into_iter()
			.flat_map(|(_number, hashes)| hashes)
			.collect();

		let mut pruned_wakeups = BTreeMap::new();
		self.reverse_wakeups.retain(|&(ref h, ref c_h), tick| {
			let live = !pruned_blocks.contains(h);
			if !live {
				pruned_wakeups.entry(*tick)
					.or_insert_with(HashSet::new)
					.insert((*h, *c_h));
			}
			live
		});

		for (tick, pruned) in pruned_wakeups {
			if let Entry::Occupied(mut entry) = self.wakeups.entry(tick) {
				entry.get_mut().retain(|wakeup| !pruned.contains(wakeup));
				if entry.get().is_empty() {
					let _ = entry.remove();
				}
			}
		}
	}

433
434
435
436
437
	// Get the wakeup for a particular block/candidate combo, if any.
	fn wakeup_for(&self, block_hash: Hash, candidate_hash: CandidateHash) -> Option<Tick> {
		self.reverse_wakeups.get(&(block_hash, candidate_hash)).map(|t| *t)
	}

438
	// Returns the next wakeup. this future never returns if there are no wakeups.
439
	async fn next(&mut self, clock: &(dyn Clock + Sync)) -> (Tick, Hash, CandidateHash) {
440
441
442
443
444
445
446
447
448
449
450
451
452
		match self.first() {
			None => future::pending().await,
			Some(tick) => {
				clock.wait(tick).await;
				match self.wakeups.entry(tick) {
					Entry::Vacant(_) => panic!("entry is known to exist since `first` was `Some`; qed"),
					Entry::Occupied(mut entry) => {
						let (hash, candidate_hash) = entry.get_mut().pop()
							.expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed");

						if entry.get().is_empty() {
							let _ = entry.remove();
						}
453

454
455
						self.reverse_wakeups.remove(&(hash, candidate_hash));

456
						(tick, hash, candidate_hash)
457
458
459
460
					}
				}
			}
		}
461
462
463
464
465
466
467
468
469
470
471
472
473
474
	}
}

/// A read-only handle to a database.
trait DBReader {
	fn load_block_entry(
		&self,
		block_hash: &Hash,
	) -> SubsystemResult<Option<BlockEntry>>;

	fn load_candidate_entry(
		&self,
		candidate_hash: &CandidateHash,
	) -> SubsystemResult<Option<CandidateEntry>>;
475
476

	fn load_all_blocks(&self) -> SubsystemResult<Vec<Hash>>;
477
478
479
480
481
}

// This is a submodule to enforce opacity of the inner DB type.
mod approval_db_v1_reader {
	use super::{
482
		DBReader, KeyValueDB, Hash, CandidateHash, BlockEntry, CandidateEntry,
483
		SubsystemResult, SubsystemError, DatabaseConfig, approval_db,
484
485
486
	};

	/// A DB reader that uses the approval-db V1 under the hood.
487
488
489
490
	pub(super) struct ApprovalDBV1Reader<T> {
		inner: T,
		config: DatabaseConfig,
	}
491

492
493
494
495
496
497
	impl<T> ApprovalDBV1Reader<T> {
		pub(super) fn new(inner: T, config: DatabaseConfig) -> Self {
			ApprovalDBV1Reader {
				inner,
				config,
			}
498
499
500
		}
	}

501
502
503
	impl<'a, T: 'a> DBReader for ApprovalDBV1Reader<T>
		where T: std::ops::Deref<Target=(dyn KeyValueDB + 'a)>
	{
504
505
506
507
		fn load_block_entry(
			&self,
			block_hash: &Hash,
		) -> SubsystemResult<Option<BlockEntry>> {
508
			approval_db::v1::load_block_entry(&*self.inner, &self.config, block_hash)
509
510
511
512
513
514
515
516
				.map(|e| e.map(Into::into))
				.map_err(|e| SubsystemError::with_origin("approval-voting", e))
		}

		fn load_candidate_entry(
			&self,
			candidate_hash: &CandidateHash,
		) -> SubsystemResult<Option<CandidateEntry>> {
517
			approval_db::v1::load_candidate_entry(&*self.inner, &self.config, candidate_hash)
518
519
520
				.map(|e| e.map(Into::into))
				.map_err(|e| SubsystemError::with_origin("approval-voting", e))
		}
521
522
523
524
525

		fn load_all_blocks(&self) -> SubsystemResult<Vec<Hash>> {
			approval_db::v1::load_all_blocks(&*self.inner, &self.config)
				.map_err(|e| SubsystemError::with_origin("approval-voting", e))
		}
526
527
528
529
	}
}
use approval_db_v1_reader::ApprovalDBV1Reader;

530
531
532
533
534
535
struct ApprovalStatus {
	required_tranches: RequiredTranches,
	tranche_now: DelayTranche,
	block_tick: Tick,
}

536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
#[derive(Copy, Clone)]
enum ApprovalOutcome {
	Approved,
	Failed,
	TimedOut,
}

struct ApprovalState {
	validator_index: ValidatorIndex,
	candidate_hash: CandidateHash,
	approval_outcome: ApprovalOutcome,
}

impl ApprovalState {
	fn approved(
		validator_index: ValidatorIndex,
		candidate_hash: CandidateHash,
	) -> Self {
		Self {
			validator_index,
			candidate_hash,
			approval_outcome: ApprovalOutcome::Approved,
		}
	}
	fn failed(
		validator_index: ValidatorIndex,
		candidate_hash: CandidateHash,
	) -> Self {
		Self {
			validator_index,
			candidate_hash,
			approval_outcome: ApprovalOutcome::Failed,
		}
	}
}

struct CurrentlyCheckingSet {
	candidate_hash_map: HashMap<CandidateHash, Vec<Hash>>,
	currently_checking: FuturesUnordered<BoxFuture<'static, ApprovalState>>,
}

impl Default for CurrentlyCheckingSet {
	fn default() -> Self {
		Self {
			candidate_hash_map: HashMap::new(),
			currently_checking: FuturesUnordered::new(),
		}
	}
}

impl CurrentlyCheckingSet {
	// This function will lazily launch approval voting work whenever the
	// candidate is not already undergoing validation.
	pub async fn insert_relay_block_hash(
		&mut self,
		candidate_hash: CandidateHash,
		validator_index: ValidatorIndex,
		relay_block: Hash,
		launch_work: impl Future<Output = SubsystemResult<RemoteHandle<ApprovalState>>>,
	) -> SubsystemResult<()> {
		let val = self.candidate_hash_map
			.entry(candidate_hash)
			.or_insert(Default::default());

		if let Err(k) = val.binary_search_by_key(&relay_block, |v| *v) {
			let _ = val.insert(k, relay_block);
			let work = launch_work.await?;
			self.currently_checking.push(
				Box::pin(async move {
					match work.timeout(APPROVAL_CHECKING_TIMEOUT).await {
						None => ApprovalState {
							candidate_hash,
							validator_index,
							approval_outcome: ApprovalOutcome::TimedOut,
						},
						Some(approval_state) => approval_state,
					}
				})
			);
		}

		Ok(())
	}

	pub async fn next(
		&mut self,
		approvals_cache: &mut lru::LruCache<CandidateHash, ApprovalOutcome>,
	) -> (Vec<Hash>, ApprovalState) {
		if !self.currently_checking.is_empty() {
			if let Some(approval_state) = self.currently_checking
				.next()
				.await
			{
				let out = self.candidate_hash_map.remove(&approval_state.candidate_hash).unwrap_or_default();
				approvals_cache.put(approval_state.candidate_hash.clone(), approval_state.approval_outcome.clone());
				return (out, approval_state);
			}
		}

		future::pending().await
	}
}

639
struct State<T> {
640
	session_window: RollingSessionWindow,
641
	keystore: Arc<LocalKeystore>,
642
643
644
645
646
647
648
649
650
651
	slot_duration_millis: u64,
	db: T,
	clock: Box<dyn Clock + Send + Sync>,
	assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
}

impl<T> State<T> {
	fn session_info(&self, i: SessionIndex) -> Option<&SessionInfo> {
		self.session_window.session_info(i)
	}
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697

	// Compute the required tranches for approval for this block and candidate combo.
	// Fails if there is no approval entry for the block under the candidate or no candidate entry
	// under the block, or if the session is out of bounds.
	fn approval_status<'a, 'b>(
		&'a self,
		block_entry: &'a BlockEntry,
		candidate_entry: &'b CandidateEntry,
	) -> Option<(&'b ApprovalEntry, ApprovalStatus)> {
		let session_info = match self.session_info(block_entry.session()) {
			Some(s) => s,
			None => {
				tracing::warn!(target: LOG_TARGET, "Unknown session info for {}", block_entry.session());
				return None;
			}
		};
		let block_hash = block_entry.block_hash();

		let tranche_now = self.clock.tranche_now(self.slot_duration_millis, block_entry.slot());
		let block_tick = slot_number_to_tick(self.slot_duration_millis, block_entry.slot());
		let no_show_duration = slot_number_to_tick(
			self.slot_duration_millis,
			Slot::from(u64::from(session_info.no_show_slots)),
		);

		if let Some(approval_entry) = candidate_entry.approval_entry(&block_hash) {
			let required_tranches = approval_checking::tranches_to_approve(
				approval_entry,
				candidate_entry.approvals(),
				tranche_now,
				block_tick,
				no_show_duration,
				session_info.needed_approvals as _
			);

			let status = ApprovalStatus {
				required_tranches,
				block_tick,
				tranche_now,
			};

			Some((approval_entry, status))
		} else {
			None
		}
	}
698
699
}

700
#[derive(Debug, Clone)]
701
702
703
enum Action {
	ScheduleWakeup {
		block_hash: Hash,
704
		block_number: BlockNumber,
705
706
707
708
709
710
		candidate_hash: CandidateHash,
		tick: Tick,
	},
	WriteBlockEntry(BlockEntry),
	WriteCandidateEntry(CandidateHash, CandidateEntry),
	LaunchApproval {
711
		candidate_hash: CandidateHash,
712
		indirect_cert: IndirectAssignmentCert,
713
		assignment_tranche: DelayTranche,
714
		relay_block_hash: Hash,
715
716
717
		candidate_index: CandidateIndex,
		session: SessionIndex,
		candidate: CandidateReceipt,
718
		backing_group: GroupIndex,
719
	},
720
	IssueApproval(CandidateHash, ApprovalVoteRequest),
721
	BecomeActive,
722
723
724
	Conclude,
}

725
async fn run<C>(
726
	mut ctx: C,
727
	mut subsystem: ApprovalVotingSubsystem,
728
729
730
	clock: Box<dyn Clock + Send + Sync>,
	assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
) -> SubsystemResult<()>
731
	where C: SubsystemContext<Message = ApprovalVotingMessage>
732
733
{
	let mut state = State {
734
		session_window: RollingSessionWindow::new(APPROVAL_SESSIONS),
735
736
		keystore: subsystem.keystore,
		slot_duration_millis: subsystem.slot_duration_millis,
737
		db: ApprovalDBV1Reader::new(subsystem.db.clone(), subsystem.db_config.clone()),
738
739
740
741
742
		clock,
		assignment_criteria,
	};

	let mut wakeups = Wakeups::default();
743
744
	let mut currently_checking_set = CurrentlyCheckingSet::default();
	let mut approvals_cache = lru::LruCache::new(APPROVAL_CACHE_SIZE);
745

746
747
748
749
750
751
	let mut last_finalized_height: Option<BlockNumber> = None;

	let db_writer = &*subsystem.db;

	loop {
		let actions = futures::select! {
752
			(tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => {
753
				subsystem.metrics.on_wakeup();
754
755
756
757
				process_wakeup(
					&mut state,
					woken_block,
					woken_candidate,
758
					tick,
759
				)?
760
761
			}
			next_msg = ctx.recv().fuse() => {
762
				let mut actions = handle_from_overseer(
763
764
					&mut ctx,
					&mut state,
765
					&subsystem.metrics,
766
					db_writer,
767
					subsystem.db_config,
768
769
					next_msg?,
					&mut last_finalized_height,
770
					&mut wakeups,
771
772
				).await?;

773
774
775
776
777
778
779
				if let Mode::Syncing(ref mut oracle) = subsystem.mode {
					if !oracle.is_major_syncing() {
						// note that we're active before processing other actions.
						actions.insert(0, Action::BecomeActive)
					}
				}

780
				actions
781
			}
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
			approval_state = currently_checking_set.next(&mut approvals_cache).fuse() => {
				let mut actions = Vec::new();
				let (
					relay_block_hashes,
					ApprovalState {
						validator_index,
						candidate_hash,
						approval_outcome,
					}
				) = approval_state;

				if matches!(approval_outcome, ApprovalOutcome::Approved) {
					let mut approvals: Vec<Action> = relay_block_hashes
						.into_iter()
						.map(|block_hash|
							Action::IssueApproval(
								candidate_hash,
								ApprovalVoteRequest {
									validator_index,
									block_hash,
								},
							)
						)
						.collect();
					actions.append(&mut approvals);
807
				}
808
809

				actions
810
811
812
813
814
			}
		};

		if handle_actions(
			&mut ctx,
815
			&mut state,
816
			&subsystem.metrics,
817
			&mut wakeups,
818
819
			&mut currently_checking_set,
			&mut approvals_cache,
820
			db_writer,
821
822
			subsystem.db_config,
			&mut subsystem.mode,
823
824
825
826
827
828
829
830
831
			actions,
		).await? {
			break;
		}
	}

	Ok(())
}

832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
// Handle actions is a function that accepts a set of instructions
// and subsequently updates the underlying approvals_db in accordance
// with the linear set of instructions passed in. Therefore, actions
// must be processed in series to ensure that earlier actions are not
// negated/corrupted by later actions being executed out-of-order.
//
// However, certain Actions can cause additional actions to need to be
// processed by this function. In order to preserve linearity, we would
// need to handle these newly generated actions before we finalize
// completing additional actions in the submitted sequence of actions.
//
// Since recursive async functions are not not stable yet, we are
// forced to modify the actions iterator on the fly whenever a new set
// of actions are generated by handling a single action.
//
// This particular problem statement is specified in issue 3311:
// 	https://github.com/paritytech/polkadot/issues/3311
//
850
851
852
// returns `true` if any of the actions was a `Conclude` command.
async fn handle_actions(
	ctx: &mut impl SubsystemContext,
853
	state: &mut State<impl DBReader>,
854
	metrics: &Metrics,
855
	wakeups: &mut Wakeups,
856
857
	currently_checking_set: &mut CurrentlyCheckingSet,
	approvals_cache: &mut lru::LruCache<CandidateHash, ApprovalOutcome>,
858
	db: &dyn KeyValueDB,
859
860
	db_config: DatabaseConfig,
	mode: &mut Mode,
861
	actions: Vec<Action>,
862
) -> SubsystemResult<bool> {
863
	let mut transaction = approval_db::v1::Transaction::new(db_config);
864
865
	let mut conclude = false;

866
867
	let mut actions_iter = actions.into_iter();
	while let Some(action) = actions_iter.next() {
868
869
870
		match action {
			Action::ScheduleWakeup {
				block_hash,
871
				block_number,
872
873
				candidate_hash,
				tick,
874
875
876
			} => {
				wakeups.schedule(block_hash, block_number, candidate_hash, tick)
			}
877
878
879
880
881
882
			Action::WriteBlockEntry(block_entry) => {
				transaction.put_block_entry(block_entry.into());
			}
			Action::WriteCandidateEntry(candidate_hash, candidate_entry) => {
				transaction.put_candidate_entry(candidate_hash, candidate_entry.into());
			}
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
			Action::IssueApproval(candidate_hash, approval_request) => {
					let mut sender = ctx.sender().clone();
					// Note that the IssueApproval action will create additional
					// actions that will need to all be processed before we can
					// handle the next action in the set passed to the ambient
					// function.
					//
					// In order to achieve this, we append the existing iterator
					// to the end of the iterator made up of these newly generated
					// actions.
					//
					// Note that chaining these iterators is O(n) as we must consume
					// the prior iterator.
					let next_actions: Vec<Action> = issue_approval(
						&mut sender,
						state,
						metrics,
						candidate_hash,
						approval_request,
					)?.into_iter().map(|v| v.clone()).chain(actions_iter).collect();
					actions_iter = next_actions.into_iter();
			}
905
			Action::LaunchApproval {
906
				candidate_hash,
907
				indirect_cert,
908
				assignment_tranche,
909
				relay_block_hash,
910
911
912
				candidate_index,
				session,
				candidate,
913
				backing_group,
914
			} => {
915
916
917
				// Don't launch approval work if the node is syncing.
				if let Mode::Syncing(_) = *mode { continue }

918
				metrics.on_assignment_produced(assignment_tranche);
919
920
921
				let block_hash = indirect_cert.block_hash;
				let validator_index = indirect_cert.validator;

922
				ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment(
923
924
					indirect_cert,
					candidate_index,
925
				).into());
926

927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
				match approvals_cache.get(&candidate_hash) {
					Some(ApprovalOutcome::Approved) => {
						let new_actions: Vec<Action> = std::iter::once(
							Action::IssueApproval(
								candidate_hash,
								ApprovalVoteRequest {
									validator_index,
									block_hash,
								}
							)
						)
							.map(|v| v.clone())
							.chain(actions_iter)
							.collect();
						actions_iter = new_actions.into_iter();
					},
					None => {
						let ctx = &mut *ctx;
						currently_checking_set.insert_relay_block_hash(
							candidate_hash,
							validator_index,
							relay_block_hash,
							async move {
								launch_approval(
									ctx,
									metrics.clone(),
									session,
									candidate,
									validator_index,
									block_hash,
									backing_group,
								).await
							}
						).await?;
					}
					Some(_) => {},
963
				}
964
			}
965
966
967
968
969
970
971
972
973
			Action::BecomeActive => {
				*mode = Mode::Active;

				let messages = distribution_messages_for_activation(
					ApprovalDBV1Reader::new(db, db_config)
				)?;

				ctx.send_messages(messages.into_iter().map(Into::into)).await;
			}
974
975
976
977
			Action::Conclude => { conclude = true; }
		}
	}

978
979
980
981
982
983
	if !transaction.is_empty() {
		let _timer = metrics.time_db_transaction();

		transaction.write(db)
			.map_err(|e| SubsystemError::with_origin("approval-voting", e))?;
	}
984
985
986
987

	Ok(conclude)
}

988
989
990
991
992
993
994
995
996
997
998
999
1000
fn distribution_messages_for_activation<'a>(
	db: impl DBReader + 'a,
) -> SubsystemResult<Vec<ApprovalDistributionMessage>> {
	let all_blocks = db.load_all_blocks()?;

	let mut approval_meta = Vec::with_capacity(all_blocks.len());
	let mut messages = Vec::new();

	messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); // dummy value.

	for block_hash in all_blocks {
		let block_entry = match db.load_block_entry(&block_hash)? {
			Some(b) => b,
For faster browsing, not all history is shown. View entire blame