lib.rs 68.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! The Approval Voting Subsystem.
//!
//! This subsystem is responsible for determining candidates to do approval checks
//! on, performing those approval checks, and tracking the assignments and approvals
//! of others. It uses this information to determine when candidates and blocks have
//! been sufficiently approved to finalize.

Shawn Tabrizi's avatar
Shawn Tabrizi committed
24
25
26
27
28
29
30
31
use kvdb::KeyValueDB;
use polkadot_node_jaeger as jaeger;
use polkadot_node_primitives::{
	approval::{
		BlockApprovalMeta, DelayTranche, IndirectAssignmentCert, IndirectSignedApprovalVote,
	},
	SignedDisputeStatement, ValidationResult,
};
32
use polkadot_node_subsystem::{
Shawn Tabrizi's avatar
Shawn Tabrizi committed
33
	errors::RecoveryError,
34
	messages::{
Shawn Tabrizi's avatar
Shawn Tabrizi committed
35
36
37
38
39
		ApprovalCheckError, ApprovalCheckResult, ApprovalDistributionMessage,
		ApprovalVotingMessage, AssignmentCheckError, AssignmentCheckResult,
		AvailabilityRecoveryMessage, BlockDescription, CandidateValidationMessage, ChainApiMessage,
		ChainSelectionMessage, DisputeCoordinatorMessage, HighestApprovedAncestorBlock,
		ImportStatementsResult, RuntimeApiMessage, RuntimeApiRequest,
40
	},
Shawn Tabrizi's avatar
Shawn Tabrizi committed
41
42
43
	overseer::{self, SubsystemSender as _},
	FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext, SubsystemError,
	SubsystemResult, SubsystemSender,
44
};
45
46
use polkadot_node_subsystem_util::{
	metrics::{self, prometheus},
47
	rolling_session_window::RollingSessionWindow,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
48
	TimeoutExt,
49
};
50
use polkadot_primitives::v1::{
Shawn Tabrizi's avatar
Shawn Tabrizi committed
51
52
53
	ApprovalVote, BlockNumber, CandidateHash, CandidateIndex, CandidateReceipt, DisputeStatement,
	GroupIndex, Hash, SessionIndex, SessionInfo, ValidDisputeStatementKind, ValidatorId,
	ValidatorIndex, ValidatorPair, ValidatorSignature,
54
55
};
use sc_keystore::LocalKeystore;
Shawn Tabrizi's avatar
Shawn Tabrizi committed
56
use sp_application_crypto::Pair;
57
use sp_consensus::SyncOracle;
58
use sp_consensus_slots::Slot;
59

Shawn Tabrizi's avatar
Shawn Tabrizi committed
60
61
62
63
64
65
use futures::{
	channel::oneshot,
	future::{BoxFuture, RemoteHandle},
	prelude::*,
	stream::FuturesUnordered,
};
66

Shawn Tabrizi's avatar
Shawn Tabrizi committed
67
68
69
70
71
use std::{
	collections::{btree_map::Entry, BTreeMap, HashMap, HashSet},
	sync::Arc,
	time::Duration,
};
72
73
74

use approval_checking::RequiredTranches;
use criteria::{AssignmentCriteria, RealAssignmentCriteria};
Shawn Tabrizi's avatar
Shawn Tabrizi committed
75
76
use persisted_entries::{ApprovalEntry, BlockEntry, CandidateEntry};
use time::{slot_number_to_tick, Clock, ClockExt, SystemClock, Tick};
77
78
79

mod approval_checking;
mod approval_db;
80
mod backend;
81
82
mod criteria;
mod import;
83
mod ops;
84
mod persisted_entries;
Shawn Tabrizi's avatar
Shawn Tabrizi committed
85
mod time;
86

Shawn Tabrizi's avatar
Shawn Tabrizi committed
87
88
89
90
use crate::{
	approval_db::v1::{Config as DatabaseConfig, DbBackend},
	backend::{Backend, OverlayedBackend},
};
91

92
93
94
95
#[cfg(test)]
mod tests;

const APPROVAL_SESSIONS: SessionIndex = 6;
96
97
const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120);
const APPROVAL_CACHE_SIZE: usize = 1024;
98
const TICK_TOO_FAR_IN_FUTURE: Tick = 20; // 10 seconds.
99
const LOG_TARGET: &str = "parachain::approval-voting";
100

101
/// Configuration for the approval voting subsystem
102
#[derive(Debug, Clone)]
103
pub struct Config {
104
105
	/// The column family in the DB where approval-voting data is stored.
	pub col_data: u32,
106
107
108
109
110
	/// The slot duration of the consensus algorithm, in milliseconds. Should be evenly
	/// divisible by 500.
	pub slot_duration_millis: u64,
}

111
112
113
114
115
116
117
118
119
120
121
122
123
124
// The mode of the approval voting subsystem. It should start in a `Syncing` mode when it first
// starts, and then once it's reached the head of the chain it should move into the `Active` mode.
//
// In `Active` mode, the node is an active participant in the approvals protocol. When syncing,
// the node follows the new incoming blocks and finalized number, but does not yet participate.
//
// When transitioning from `Syncing` to `Active`, the node notifies the `ApprovalDistribution`
// subsystem of all unfinalized blocks and the candidates included within them, as well as all
// votes that the local node itself has cast on candidates within those blocks.
enum Mode {
	Active,
	Syncing(Box<dyn SyncOracle + Send>),
}

125
/// The approval voting subsystem.
126
pub struct ApprovalVotingSubsystem {
Denis_P's avatar
Denis_P committed
127
	/// `LocalKeystore` is needed for assignment keys, but not necessarily approval keys.
128
129
	///
	/// We do a lot of VRF signing and need the keys to have low latency.
130
	keystore: Arc<LocalKeystore>,
131
	db_config: DatabaseConfig,
132
	slot_duration_millis: u64,
133
	db: Arc<dyn KeyValueDB>,
134
	mode: Mode,
135
136
137
138
139
140
	metrics: Metrics,
}

#[derive(Clone)]
struct MetricsInner {
	imported_candidates_total: prometheus::Counter<prometheus::U64>,
141
142
	assignments_produced: prometheus::Histogram,
	approvals_produced_total: prometheus::CounterVec<prometheus::U64>,
143
144
145
146
	no_shows_total: prometheus::Counter<prometheus::U64>,
	wakeups_triggered_total: prometheus::Counter<prometheus::U64>,
	candidate_approval_time_ticks: prometheus::Histogram,
	block_approval_time_ticks: prometheus::Histogram,
147
	time_db_transaction: prometheus::Histogram,
148
	time_recover_and_approve: prometheus::Histogram,
149
150
}

Denis_P's avatar
Denis_P committed
151
/// Approval Voting metrics.
152
153
154
155
156
157
158
159
160
161
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_candidate_imported(&self) {
		if let Some(metrics) = &self.0 {
			metrics.imported_candidates_total.inc();
		}
	}

162
	fn on_assignment_produced(&self, tranche: DelayTranche) {
163
		if let Some(metrics) = &self.0 {
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
			metrics.assignments_produced.observe(tranche as f64);
		}
	}

	fn on_approval_stale(&self) {
		if let Some(metrics) = &self.0 {
			metrics.approvals_produced_total.with_label_values(&["stale"]).inc()
		}
	}

	fn on_approval_invalid(&self) {
		if let Some(metrics) = &self.0 {
			metrics.approvals_produced_total.with_label_values(&["invalid"]).inc()
		}
	}

	fn on_approval_unavailable(&self) {
		if let Some(metrics) = &self.0 {
			metrics.approvals_produced_total.with_label_values(&["unavailable"]).inc()
		}
	}

	fn on_approval_error(&self) {
		if let Some(metrics) = &self.0 {
			metrics.approvals_produced_total.with_label_values(&["internal error"]).inc()
189
190
191
192
193
		}
	}

	fn on_approval_produced(&self) {
		if let Some(metrics) = &self.0 {
194
			metrics.approvals_produced_total.with_label_values(&["success"]).inc()
195
196
		}
	}
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220

	fn on_no_shows(&self, n: usize) {
		if let Some(metrics) = &self.0 {
			metrics.no_shows_total.inc_by(n as u64);
		}
	}

	fn on_wakeup(&self) {
		if let Some(metrics) = &self.0 {
			metrics.wakeups_triggered_total.inc();
		}
	}

	fn on_candidate_approved(&self, ticks: Tick) {
		if let Some(metrics) = &self.0 {
			metrics.candidate_approval_time_ticks.observe(ticks as f64);
		}
	}

	fn on_block_approved(&self, ticks: Tick) {
		if let Some(metrics) = &self.0 {
			metrics.block_approval_time_ticks.observe(ticks as f64);
		}
	}
221
222
223
224

	fn time_db_transaction(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.time_db_transaction.start_timer())
	}
225
226
227
228

	fn time_recover_and_approve(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.time_recover_and_approve.start_timer())
	}
229
230
231
232
233
234
235
236
237
238
239
240
241
242
}

impl metrics::Metrics for Metrics {
	fn try_register(
		registry: &prometheus::Registry,
	) -> std::result::Result<Self, prometheus::PrometheusError> {
		let metrics = MetricsInner {
			imported_candidates_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_imported_candidates_total",
					"Number of candidates imported by the approval voting subsystem",
				)?,
				registry,
			)?,
243
244
245
246
247
248
			assignments_produced: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_assignments_produced",
						"Assignments and tranches produced by the approval voting subsystem",
					).buckets(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 15.0, 25.0, 40.0, 70.0]),
249
250
251
252
				)?,
				registry,
			)?,
			approvals_produced_total: prometheus::register(
253
254
255
256
257
258
				prometheus::CounterVec::new(
					prometheus::Opts::new(
						"parachain_approvals_produced_total",
						"Number of approvals produced by the approval voting subsystem",
					),
					&["status"]
259
260
261
				)?,
				registry,
			)?,
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
			no_shows_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_approvals_no_shows_total",
					"Number of assignments which became no-shows in the approval voting subsystem",
				)?,
				registry,
			)?,
			wakeups_triggered_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_approvals_wakeups_total",
					"Number of times we woke up to process a candidate in the approval voting subsystem",
				)?,
				registry,
			)?,
			candidate_approval_time_ticks: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_approvals_candidate_approval_time_ticks",
						"Number of ticks (500ms) to approve candidates.",
					).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
				)?,
				registry,
			)?,
			block_approval_time_ticks: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_approvals_blockapproval_time_ticks",
						"Number of ticks (500ms) to approve blocks.",
					).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
				)?,
				registry,
			)?,
294
295
296
297
298
299
300
301
302
			time_db_transaction: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_time_approval_db_transaction",
						"Time spent writing an approval db transaction.",
					)
				)?,
				registry,
			)?,
303
304
305
306
307
308
309
310
311
			time_recover_and_approve: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_time_recover_and_approve",
						"Time spent recovering and approving data in approval voting",
					)
				)?,
				registry,
			)?,
312
		};
313

314
315
		Ok(Metrics(Some(metrics)))
	}
316
317
}

318
impl ApprovalVotingSubsystem {
319
	/// Create a new approval voting subsystem with the given keystore, config, and database.
320
321
	pub fn with_config(
		config: Config,
322
		db: Arc<dyn KeyValueDB>,
323
		keystore: Arc<LocalKeystore>,
324
		sync_oracle: Box<dyn SyncOracle + Send>,
325
		metrics: Metrics,
326
327
	) -> Self {
		ApprovalVotingSubsystem {
328
			keystore,
329
			slot_duration_millis: config.slot_duration_millis,
330
			db,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
331
			db_config: DatabaseConfig { col_data: config.col_data },
332
			mode: Mode::Syncing(sync_oracle),
333
			metrics,
334
		}
335
336
337
	}
}

338
339
340
341
impl<Context> overseer::Subsystem<Context, SubsystemError> for ApprovalVotingSubsystem
where
	Context: SubsystemContext<Message = ApprovalVotingMessage>,
	Context: overseer::SubsystemContext<Message = ApprovalVotingMessage>,
342
{
343
	fn start(self, ctx: Context) -> SpawnedSubsystem {
344
		let backend = DbBackend::new(self.db.clone(), self.db_config);
345
		let future = run::<DbBackend, Context>(
346
347
348
349
			ctx,
			self,
			Box::new(SystemClock),
			Box::new(RealAssignmentCriteria),
350
			backend,
351
		)
Shawn Tabrizi's avatar
Shawn Tabrizi committed
352
353
		.map_err(|e| SubsystemError::with_origin("approval-voting", e))
		.boxed();
354

Shawn Tabrizi's avatar
Shawn Tabrizi committed
355
		SpawnedSubsystem { name: "approval-voting-subsystem", future }
356
357
358
	}
}

359
#[derive(Debug, Clone)]
360
361
362
363
364
365
366
367
368
369
struct ApprovalVoteRequest {
	validator_index: ValidatorIndex,
	block_hash: Hash,
}

#[derive(Default)]
struct Wakeups {
	// Tick -> [(Relay Block, Candidate Hash)]
	wakeups: BTreeMap<Tick, Vec<(Hash, CandidateHash)>>,
	reverse_wakeups: HashMap<(Hash, CandidateHash), Tick>,
370
	block_numbers: BTreeMap<BlockNumber, HashSet<Hash>>,
371
372
373
374
375
376
377
378
}

impl Wakeups {
	// Returns the first tick there exist wakeups for, if any.
	fn first(&self) -> Option<Tick> {
		self.wakeups.keys().next().map(|t| *t)
	}

379
380
381
382
	fn note_block(&mut self, block_hash: Hash, block_number: BlockNumber) {
		self.block_numbers.entry(block_number).or_default().insert(block_hash);
	}

383
384
	// Schedules a wakeup at the given tick. no-op if there is already an earlier or equal wake-up
	// for these values. replaces any later wakeup.
385
386
387
388
389
390
391
	fn schedule(
		&mut self,
		block_hash: Hash,
		block_number: BlockNumber,
		candidate_hash: CandidateHash,
		tick: Tick,
	) {
392
		if let Some(prev) = self.reverse_wakeups.get(&(block_hash, candidate_hash)) {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
393
394
395
			if prev <= &tick {
				return
			}
396
397
398

			// we are replacing previous wakeup with an earlier one.
			if let Entry::Occupied(mut entry) = self.wakeups.entry(*prev) {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
399
400
				if let Some(pos) =
					entry.get().iter().position(|x| x == &(block_hash, candidate_hash))
401
402
403
404
405
406
407
408
				{
					entry.get_mut().remove(pos);
				}

				if entry.get().is_empty() {
					let _ = entry.remove_entry();
				}
			}
409
410
		} else {
			self.note_block(block_hash, block_number);
411
412
413
414
415
416
		}

		self.reverse_wakeups.insert((block_hash, candidate_hash), tick);
		self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash));
	}

417
418
419
420
421
422
423
424
425
426
427
	fn prune_finalized_wakeups(&mut self, finalized_number: BlockNumber) {
		let after = self.block_numbers.split_off(&(finalized_number + 1));
		let pruned_blocks: HashSet<_> = std::mem::replace(&mut self.block_numbers, after)
			.into_iter()
			.flat_map(|(_number, hashes)| hashes)
			.collect();

		let mut pruned_wakeups = BTreeMap::new();
		self.reverse_wakeups.retain(|&(ref h, ref c_h), tick| {
			let live = !pruned_blocks.contains(h);
			if !live {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
428
				pruned_wakeups.entry(*tick).or_insert_with(HashSet::new).insert((*h, *c_h));
429
430
431
432
433
434
435
436
437
438
439
440
441
442
			}
			live
		});

		for (tick, pruned) in pruned_wakeups {
			if let Entry::Occupied(mut entry) = self.wakeups.entry(tick) {
				entry.get_mut().retain(|wakeup| !pruned.contains(wakeup));
				if entry.get().is_empty() {
					let _ = entry.remove();
				}
			}
		}
	}

443
444
445
446
447
	// Get the wakeup for a particular block/candidate combo, if any.
	fn wakeup_for(&self, block_hash: Hash, candidate_hash: CandidateHash) -> Option<Tick> {
		self.reverse_wakeups.get(&(block_hash, candidate_hash)).map(|t| *t)
	}

448
	// Returns the next wakeup. this future never returns if there are no wakeups.
449
	async fn next(&mut self, clock: &(dyn Clock + Sync)) -> (Tick, Hash, CandidateHash) {
450
451
452
453
454
		match self.first() {
			None => future::pending().await,
			Some(tick) => {
				clock.wait(tick).await;
				match self.wakeups.entry(tick) {
455
456
457
					Entry::Vacant(_) => {
						panic!("entry is known to exist since `first` was `Some`; qed")
					},
458
459
460
461
462
463
464
					Entry::Occupied(mut entry) => {
						let (hash, candidate_hash) = entry.get_mut().pop()
							.expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed");

						if entry.get().is_empty() {
							let _ = entry.remove();
						}
465

466
467
						self.reverse_wakeups.remove(&(hash, candidate_hash));

468
						(tick, hash, candidate_hash)
Shawn Tabrizi's avatar
Shawn Tabrizi committed
469
					},
470
				}
Shawn Tabrizi's avatar
Shawn Tabrizi committed
471
			},
472
		}
473
474
475
	}
}

476
477
478
479
480
481
struct ApprovalStatus {
	required_tranches: RequiredTranches,
	tranche_now: DelayTranche,
	block_tick: Tick,
}

482
483
484
485
486
487
488
489
490
491
492
493
494
495
#[derive(Copy, Clone)]
enum ApprovalOutcome {
	Approved,
	Failed,
	TimedOut,
}

struct ApprovalState {
	validator_index: ValidatorIndex,
	candidate_hash: CandidateHash,
	approval_outcome: ApprovalOutcome,
}

impl ApprovalState {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
496
497
	fn approved(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self {
		Self { validator_index, candidate_hash, approval_outcome: ApprovalOutcome::Approved }
498
	}
Shawn Tabrizi's avatar
Shawn Tabrizi committed
499
500
	fn failed(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self {
		Self { validator_index, candidate_hash, approval_outcome: ApprovalOutcome::Failed }
501
502
503
504
505
506
507
508
509
510
	}
}

struct CurrentlyCheckingSet {
	candidate_hash_map: HashMap<CandidateHash, Vec<Hash>>,
	currently_checking: FuturesUnordered<BoxFuture<'static, ApprovalState>>,
}

impl Default for CurrentlyCheckingSet {
	fn default() -> Self {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
511
		Self { candidate_hash_map: HashMap::new(), currently_checking: FuturesUnordered::new() }
512
513
514
515
516
517
518
519
520
521
522
523
524
	}
}

impl CurrentlyCheckingSet {
	// This function will lazily launch approval voting work whenever the
	// candidate is not already undergoing validation.
	pub async fn insert_relay_block_hash(
		&mut self,
		candidate_hash: CandidateHash,
		validator_index: ValidatorIndex,
		relay_block: Hash,
		launch_work: impl Future<Output = SubsystemResult<RemoteHandle<ApprovalState>>>,
	) -> SubsystemResult<()> {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
525
		let val = self.candidate_hash_map.entry(candidate_hash).or_insert(Default::default());
526
527
528
529

		if let Err(k) = val.binary_search_by_key(&relay_block, |v| *v) {
			let _ = val.insert(k, relay_block);
			let work = launch_work.await?;
Shawn Tabrizi's avatar
Shawn Tabrizi committed
530
531
532
533
534
535
536
537
538
539
			self.currently_checking.push(Box::pin(async move {
				match work.timeout(APPROVAL_CHECKING_TIMEOUT).await {
					None => ApprovalState {
						candidate_hash,
						validator_index,
						approval_outcome: ApprovalOutcome::TimedOut,
					},
					Some(approval_state) => approval_state,
				}
			}));
540
541
542
543
544
545
546
547
548
549
		}

		Ok(())
	}

	pub async fn next(
		&mut self,
		approvals_cache: &mut lru::LruCache<CandidateHash, ApprovalOutcome>,
	) -> (Vec<Hash>, ApprovalState) {
		if !self.currently_checking.is_empty() {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
550
551
552
553
554
555
556
557
558
559
			if let Some(approval_state) = self.currently_checking.next().await {
				let out = self
					.candidate_hash_map
					.remove(&approval_state.candidate_hash)
					.unwrap_or_default();
				approvals_cache.put(
					approval_state.candidate_hash.clone(),
					approval_state.approval_outcome.clone(),
				);
				return (out, approval_state)
560
561
562
563
564
565
566
			}
		}

		future::pending().await
	}
}

567
struct State {
568
	session_window: RollingSessionWindow,
569
	keystore: Arc<LocalKeystore>,
570
571
572
573
574
	slot_duration_millis: u64,
	clock: Box<dyn Clock + Send + Sync>,
	assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
}

575
impl State {
576
577
578
	fn session_info(&self, i: SessionIndex) -> Option<&SessionInfo> {
		self.session_window.session_info(i)
	}
579
580
581
582
583
584
585
586
587
588
589
590

	// Compute the required tranches for approval for this block and candidate combo.
	// Fails if there is no approval entry for the block under the candidate or no candidate entry
	// under the block, or if the session is out of bounds.
	fn approval_status<'a, 'b>(
		&'a self,
		block_entry: &'a BlockEntry,
		candidate_entry: &'b CandidateEntry,
	) -> Option<(&'b ApprovalEntry, ApprovalStatus)> {
		let session_info = match self.session_info(block_entry.session()) {
			Some(s) => s,
			None => {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
591
592
593
594
595
596
597
				tracing::warn!(
					target: LOG_TARGET,
					"Unknown session info for {}",
					block_entry.session()
				);
				return None
			},
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
		};
		let block_hash = block_entry.block_hash();

		let tranche_now = self.clock.tranche_now(self.slot_duration_millis, block_entry.slot());
		let block_tick = slot_number_to_tick(self.slot_duration_millis, block_entry.slot());
		let no_show_duration = slot_number_to_tick(
			self.slot_duration_millis,
			Slot::from(u64::from(session_info.no_show_slots)),
		);

		if let Some(approval_entry) = candidate_entry.approval_entry(&block_hash) {
			let required_tranches = approval_checking::tranches_to_approve(
				approval_entry,
				candidate_entry.approvals(),
				tranche_now,
				block_tick,
				no_show_duration,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
615
				session_info.needed_approvals as _,
616
617
			);

Shawn Tabrizi's avatar
Shawn Tabrizi committed
618
			let status = ApprovalStatus { required_tranches, block_tick, tranche_now };
619
620
621
622
623
624

			Some((approval_entry, status))
		} else {
			None
		}
	}
625
626
}

627
#[derive(Debug, Clone)]
628
629
630
enum Action {
	ScheduleWakeup {
		block_hash: Hash,
631
		block_number: BlockNumber,
632
633
634
635
		candidate_hash: CandidateHash,
		tick: Tick,
	},
	LaunchApproval {
636
		candidate_hash: CandidateHash,
637
		indirect_cert: IndirectAssignmentCert,
638
		assignment_tranche: DelayTranche,
639
		relay_block_hash: Hash,
640
641
642
		candidate_index: CandidateIndex,
		session: SessionIndex,
		candidate: CandidateReceipt,
643
		backing_group: GroupIndex,
644
	},
645
646
647
648
649
650
651
	InformDisputeCoordinator {
		candidate_hash: CandidateHash,
		candidate_receipt: CandidateReceipt,
		session: SessionIndex,
		dispute_statement: SignedDisputeStatement,
		validator_index: ValidatorIndex,
	},
652
	NoteApprovedInChainSelection(Hash),
653
	IssueApproval(CandidateHash, ApprovalVoteRequest),
654
	BecomeActive,
655
656
657
	Conclude,
}

658
659
async fn run<B, Context>(
	mut ctx: Context,
660
	mut subsystem: ApprovalVotingSubsystem,
661
662
	clock: Box<dyn Clock + Send + Sync>,
	assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
663
	mut backend: B,
664
) -> SubsystemResult<()>
Shawn Tabrizi's avatar
Shawn Tabrizi committed
665
666
667
668
where
	Context: SubsystemContext<Message = ApprovalVotingMessage>,
	Context: overseer::SubsystemContext<Message = ApprovalVotingMessage>,
	B: Backend,
669
670
{
	let mut state = State {
671
		session_window: RollingSessionWindow::new(APPROVAL_SESSIONS),
672
673
674
675
676
677
678
		keystore: subsystem.keystore,
		slot_duration_millis: subsystem.slot_duration_millis,
		clock,
		assignment_criteria,
	};

	let mut wakeups = Wakeups::default();
679
680
	let mut currently_checking_set = CurrentlyCheckingSet::default();
	let mut approvals_cache = lru::LruCache::new(APPROVAL_CACHE_SIZE);
681

682
683
684
	let mut last_finalized_height: Option<BlockNumber> = None;

	loop {
685
		let mut overlayed_db = OverlayedBackend::new(&backend);
686
		let actions = futures::select! {
687
			(tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => {
688
				subsystem.metrics.on_wakeup();
689
690
				process_wakeup(
					&mut state,
691
					&mut overlayed_db,
692
693
					woken_block,
					woken_candidate,
694
					tick,
695
				)?
696
697
			}
			next_msg = ctx.recv().fuse() => {
698
				let mut actions = handle_from_overseer(
699
700
					&mut ctx,
					&mut state,
701
					&mut overlayed_db,
702
					&subsystem.metrics,
703
704
					next_msg?,
					&mut last_finalized_height,
705
					&mut wakeups,
706
707
				).await?;

708
709
710
711
712
713
714
				if let Mode::Syncing(ref mut oracle) = subsystem.mode {
					if !oracle.is_major_syncing() {
						// note that we're active before processing other actions.
						actions.insert(0, Action::BecomeActive)
					}
				}

715
				actions
716
			}
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
			approval_state = currently_checking_set.next(&mut approvals_cache).fuse() => {
				let mut actions = Vec::new();
				let (
					relay_block_hashes,
					ApprovalState {
						validator_index,
						candidate_hash,
						approval_outcome,
					}
				) = approval_state;

				if matches!(approval_outcome, ApprovalOutcome::Approved) {
					let mut approvals: Vec<Action> = relay_block_hashes
						.into_iter()
						.map(|block_hash|
							Action::IssueApproval(
								candidate_hash,
								ApprovalVoteRequest {
									validator_index,
									block_hash,
								},
							)
						)
						.collect();
					actions.append(&mut approvals);
742
				}
743
744

				actions
745
746
747
748
749
			}
		};

		if handle_actions(
			&mut ctx,
750
			&mut state,
751
			&mut overlayed_db,
752
			&subsystem.metrics,
753
			&mut wakeups,
754
755
			&mut currently_checking_set,
			&mut approvals_cache,
756
			&mut subsystem.mode,
757
			actions,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
758
759
760
761
		)
		.await?
		{
			break
762
		}
763
764
765
766
767
768
769

		if !overlayed_db.is_empty() {
			let _timer = subsystem.metrics.time_db_transaction();

			let ops = overlayed_db.into_write_ops();
			backend.write(ops)?;
		}
770
771
772
773
774
	}

	Ok(())
}

775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
// Handle actions is a function that accepts a set of instructions
// and subsequently updates the underlying approvals_db in accordance
// with the linear set of instructions passed in. Therefore, actions
// must be processed in series to ensure that earlier actions are not
// negated/corrupted by later actions being executed out-of-order.
//
// However, certain Actions can cause additional actions to need to be
// processed by this function. In order to preserve linearity, we would
// need to handle these newly generated actions before we finalize
// completing additional actions in the submitted sequence of actions.
//
// Since recursive async functions are not not stable yet, we are
// forced to modify the actions iterator on the fly whenever a new set
// of actions are generated by handling a single action.
//
// This particular problem statement is specified in issue 3311:
// 	https://github.com/paritytech/polkadot/issues/3311
//
793
794
// returns `true` if any of the actions was a `Conclude` command.
async fn handle_actions(
Shawn Tabrizi's avatar
Shawn Tabrizi committed
795
796
	ctx: &mut (impl SubsystemContext<Message = ApprovalVotingMessage>
	          + overseer::SubsystemContext<Message = ApprovalVotingMessage>),
797
798
	state: &mut State,
	overlayed_db: &mut OverlayedBackend<'_, impl Backend>,
799
	metrics: &Metrics,
800
	wakeups: &mut Wakeups,
801
802
	currently_checking_set: &mut CurrentlyCheckingSet,
	approvals_cache: &mut lru::LruCache<CandidateHash, ApprovalOutcome>,
803
	mode: &mut Mode,
804
	actions: Vec<Action>,
805
806
807
) -> SubsystemResult<bool> {
	let mut conclude = false;

808
809
	let mut actions_iter = actions.into_iter();
	while let Some(action) = actions_iter.next() {
810
		match action {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
811
812
			Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick } =>
				wakeups.schedule(block_hash, block_number, candidate_hash, tick),
813
			Action::IssueApproval(candidate_hash, approval_request) => {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
				let mut sender = ctx.sender().clone();
				// Note that the IssueApproval action will create additional
				// actions that will need to all be processed before we can
				// handle the next action in the set passed to the ambient
				// function.
				//
				// In order to achieve this, we append the existing iterator
				// to the end of the iterator made up of these newly generated
				// actions.
				//
				// Note that chaining these iterators is O(n) as we must consume
				// the prior iterator.
				let next_actions: Vec<Action> = issue_approval(
					&mut sender,
					state,
					overlayed_db,
					metrics,
					candidate_hash,
					approval_request,
				)
				.await?
				.into_iter()
				.map(|v| v.clone())
				.chain(actions_iter)
				.collect();

				actions_iter = next_actions.into_iter();
			},
842
			Action::LaunchApproval {
843
				candidate_hash,
844
				indirect_cert,
845
				assignment_tranche,
846
				relay_block_hash,
847
848
849
				candidate_index,
				session,
				candidate,
850
				backing_group,
851
			} => {
852
				// Don't launch approval work if the node is syncing.
Shawn Tabrizi's avatar
Shawn Tabrizi committed
853
854
855
				if let Mode::Syncing(_) = *mode {
					continue
				}
856

857
				metrics.on_assignment_produced(assignment_tranche);
858
859
860
				let block_hash = indirect_cert.block_hash;
				let validator_index = indirect_cert.validator;

861
				ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment(
862
863
					indirect_cert,
					candidate_index,
864
				));
865

866
867
				match approvals_cache.get(&candidate_hash) {
					Some(ApprovalOutcome::Approved) => {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
868
869
870
871
872
873
874
						let new_actions: Vec<Action> = std::iter::once(Action::IssueApproval(
							candidate_hash,
							ApprovalVoteRequest { validator_index, block_hash },
						))
						.map(|v| v.clone())
						.chain(actions_iter)
						.collect();
875
876
877
878
						actions_iter = new_actions.into_iter();
					},
					None => {
						let ctx = &mut *ctx;
Shawn Tabrizi's avatar
Shawn Tabrizi committed
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
						currently_checking_set
							.insert_relay_block_hash(
								candidate_hash,
								validator_index,
								relay_block_hash,
								async move {
									launch_approval(
										ctx,
										metrics.clone(),
										session,
										candidate,
										validator_index,
										block_hash,
										backing_group,
									)
									.await
								},
							)
							.await?;
					},
899
					Some(_) => {},
900
				}
Shawn Tabrizi's avatar
Shawn Tabrizi committed
901
			},
902
903
904
905
906
907
908
909
910
911
912
913
914
915
			Action::InformDisputeCoordinator {
				candidate_hash,
				candidate_receipt,
				session,
				dispute_statement,
				validator_index,
			} => {
				let (pending_confirmation, confirmation_rx) = oneshot::channel();
				ctx.send_message(DisputeCoordinatorMessage::ImportStatements {
					candidate_hash,
					candidate_receipt,
					session,
					statements: vec![(dispute_statement, validator_index)],
					pending_confirmation,
Shawn Tabrizi's avatar
Shawn Tabrizi committed
916
917
				})
				.await;
918
919

				match confirmation_rx.await {
920
921
922
					Err(oneshot::Canceled) => {
						tracing::warn!(target: LOG_TARGET, "Dispute coordinator confirmation lost",)
					},
Shawn Tabrizi's avatar
Shawn Tabrizi committed
923
					Ok(ImportStatementsResult::ValidImport) => {},
924
925
926
927
928
					Ok(ImportStatementsResult::InvalidImport) => tracing::warn!(
						target: LOG_TARGET,
						"Failed to import statements of validity",
					),
				}
Shawn Tabrizi's avatar
Shawn Tabrizi committed
929
			},
930
			Action::NoteApprovedInChainSelection(block_hash) => {
931
				ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await;
Shawn Tabrizi's avatar
Shawn Tabrizi committed
932
			},
933
934
935
			Action::BecomeActive => {
				*mode = Mode::Active;

936
				let messages = distribution_messages_for_activation(overlayed_db)?;
937

938
				ctx.send_messages(messages.into_iter()).await;
Shawn Tabrizi's avatar
Shawn Tabrizi committed
939
940
941
942
			},
			Action::Conclude => {
				conclude = true;
			},
943
944
945
946
947
948
		}
	}

	Ok(conclude)
}

949
950
fn distribution_messages_for_activation(
	db: &OverlayedBackend<'_, impl Backend>,
951
) -> SubsystemResult<Vec<ApprovalDistributionMessage>> {
952
	let all_blocks: Vec<Hash> = db.load_all_blocks()?;
953
954
955
956
957
958
959
960
961
962

	let mut approval_meta = Vec::with_capacity(all_blocks.len());
	let mut messages = Vec::new();

	messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); // dummy value.

	for block_hash in all_blocks {
		let block_entry = match db.load_block_entry(&block_hash)? {
			Some(b) => b,
			None => {
Shawn Tabrizi's avatar
Shawn Tabrizi committed
963
				tracing::warn!(target: LOG_TARGET, ?block_hash, "Missing block entry",);
964
965

				continue
Shawn Tabrizi's avatar
Shawn Tabrizi committed
966
			},
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
		};
		approval_meta.push(BlockApprovalMeta {
			hash: block_hash,
			number: block_entry.block_number(),
			parent_hash: block_entry.parent_hash(),
			candidates: block_entry.candidates().iter().map(|(_, c_hash)| *c_hash).collect(),
			slot: block_entry.slot(),
		});

		for (i, (_, candidate_hash)) in block_entry.candidates().iter().enumerate() {
			let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
				Some(c) => c,
				None => {
					tracing::warn!(
						target: LOG_TARGET,
						?block_hash,
						?candidate_hash,
						"Missing candidate entry",
					);

					continue
Shawn Tabrizi's avatar
Shawn Tabrizi committed
988
				},
989
990
991
992
993
994
995
996
997
998
999
1000
			};

			match candidate_entry.approval_entry(&block_hash) {
				Some(approval_entry) => {
					match approval_entry.local_statements() {
						(None, None) | (None, Some(_)) => {}, // second is impossible case.
						(Some(assignment), None) => {
							messages.push(ApprovalDistributionMessage::DistributeAssignment(
								IndirectAssignmentCert {
									block_hash,
									validator: assignment.validator_index(),
									cert: assignment.cert().clone(),
For faster browsing, not all history is shown. View entire blame