lib.rs 61.1 KB
Newer Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! The Approval Voting Subsystem.
//!
//! This subsystem is responsible for determining candidates to do approval checks
//! on, performing those approval checks, and tracking the assignments and approvals
//! of others. It uses this information to determine when candidates and blocks have
//! been sufficiently approved to finalize.

use polkadot_node_subsystem::{
		AssignmentCheckError, AssignmentCheckResult, ApprovalCheckError, ApprovalCheckResult,
		ApprovalVotingMessage, RuntimeApiMessage, RuntimeApiRequest, ChainApiMessage,
		ApprovalDistributionMessage, ValidationFailed, CandidateValidationMessage,
		AvailabilityRecoveryMessage,
	},
	errors::RecoveryError,
	Subsystem, SubsystemContext, SubsystemError, SubsystemResult, SpawnedSubsystem,
	FromOverseer, OverseerSignal,
};
use polkadot_node_subsystem_util::{
	metrics::{self, prometheus},
	rolling_session_window::RollingSessionWindow,
use polkadot_primitives::v1::{
	ValidatorIndex, Hash, SessionIndex, SessionInfo, CandidateHash,
	CandidateReceipt, BlockNumber, PersistedValidationData,
	ValidationCode, CandidateDescriptor, ValidatorPair, ValidatorSignature, ValidatorId,
	CandidateIndex, GroupIndex, ApprovalVote,
use polkadot_node_primitives::{ValidationResult, PoV};
use polkadot_node_primitives::approval::{
	IndirectAssignmentCert, IndirectSignedApprovalVote, DelayTranche, BlockApprovalMeta,
use polkadot_node_jaeger as jaeger;
use sc_keystore::LocalKeystore;
use sp_consensus::SyncOracle;
use sp_consensus_slots::Slot;
use sp_runtime::traits::AppVerify;
use sp_application_crypto::Pair;
use kvdb::KeyValueDB;
use futures::prelude::*;
use futures::channel::{mpsc, oneshot};

use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::btree_map::Entry;
use std::sync::Arc;

use approval_checking::RequiredTranches;
use persisted_entries::{ApprovalEntry, CandidateEntry, BlockEntry};
use criteria::{AssignmentCriteria, RealAssignmentCriteria};
use time::{slot_number_to_tick, Tick, Clock, ClockExt, SystemClock};

mod approval_checking;
mod approval_db;
mod criteria;
mod import;
mod time;
mod persisted_entries;

use crate::approval_db::v1::Config as DatabaseConfig;

#[cfg(test)]
mod tests;

const APPROVAL_SESSIONS: SessionIndex = 6;
const LOG_TARGET: &str = "parachain::approval-voting";
/// Configuration for the approval voting subsystem
#[derive(Debug, Clone)]
pub struct Config {
	/// The column family in the DB where approval-voting data is stored.
	pub col_data: u32,
	/// The slot duration of the consensus algorithm, in milliseconds. Should be evenly
	/// divisible by 500.
	pub slot_duration_millis: u64,
}

// The mode of the approval voting subsystem. It should start in a `Syncing` mode when it first
// starts, and then once it's reached the head of the chain it should move into the `Active` mode.
//
// In `Active` mode, the node is an active participant in the approvals protocol. When syncing,
// the node follows the new incoming blocks and finalized number, but does not yet participate.
//
// When transitioning from `Syncing` to `Active`, the node notifies the `ApprovalDistribution`
// subsystem of all unfinalized blocks and the candidates included within them, as well as all
// votes that the local node itself has cast on candidates within those blocks.
enum Mode {
	Active,
	Syncing(Box<dyn SyncOracle + Send>),
}

/// The approval voting subsystem.
pub struct ApprovalVotingSubsystem {
	/// LocalKeystore is needed for assignment keys, but not necessarily approval keys.
	///
	/// We do a lot of VRF signing and need the keys to have low latency.
	db_config: DatabaseConfig,
	slot_duration_millis: u64,
	db: Arc<dyn KeyValueDB>,
	metrics: Metrics,
}

#[derive(Clone)]
struct MetricsInner {
	imported_candidates_total: prometheus::Counter<prometheus::U64>,
	assignments_produced: prometheus::Histogram,
	approvals_produced_total: prometheus::CounterVec<prometheus::U64>,
	no_shows_total: prometheus::Counter<prometheus::U64>,
	wakeups_triggered_total: prometheus::Counter<prometheus::U64>,
	candidate_approval_time_ticks: prometheus::Histogram,
	block_approval_time_ticks: prometheus::Histogram,
	time_db_transaction: prometheus::Histogram,
	time_recover_and_approve: prometheus::Histogram,
}

/// Aproval Voting metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_candidate_imported(&self) {
		if let Some(metrics) = &self.0 {
			metrics.imported_candidates_total.inc();
		}
	}

	fn on_assignment_produced(&self, tranche: DelayTranche) {
		if let Some(metrics) = &self.0 {
			metrics.assignments_produced.observe(tranche as f64);
		}
	}

	fn on_approval_stale(&self) {
		if let Some(metrics) = &self.0 {
			metrics.approvals_produced_total.with_label_values(&["stale"]).inc()
		}
	}

	fn on_approval_invalid(&self) {
		if let Some(metrics) = &self.0 {
			metrics.approvals_produced_total.with_label_values(&["invalid"]).inc()
		}
	}

	fn on_approval_unavailable(&self) {
		if let Some(metrics) = &self.0 {
			metrics.approvals_produced_total.with_label_values(&["unavailable"]).inc()
		}
	}

	fn on_approval_error(&self) {
		if let Some(metrics) = &self.0 {
			metrics.approvals_produced_total.with_label_values(&["internal error"]).inc()
		}
	}

	fn on_approval_produced(&self) {
		if let Some(metrics) = &self.0 {
			metrics.approvals_produced_total.with_label_values(&["success"]).inc()

	fn on_no_shows(&self, n: usize) {
		if let Some(metrics) = &self.0 {
			metrics.no_shows_total.inc_by(n as u64);
		}
	}

	fn on_wakeup(&self) {
		if let Some(metrics) = &self.0 {
			metrics.wakeups_triggered_total.inc();
		}
	}

	fn on_candidate_approved(&self, ticks: Tick) {
		if let Some(metrics) = &self.0 {
			metrics.candidate_approval_time_ticks.observe(ticks as f64);
		}
	}

	fn on_block_approved(&self, ticks: Tick) {
		if let Some(metrics) = &self.0 {
			metrics.block_approval_time_ticks.observe(ticks as f64);
		}
	}

	fn time_db_transaction(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.time_db_transaction.start_timer())
	}

	fn time_recover_and_approve(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
		self.0.as_ref().map(|metrics| metrics.time_recover_and_approve.start_timer())
	}
}

impl metrics::Metrics for Metrics {
	fn try_register(
		registry: &prometheus::Registry,
	) -> std::result::Result<Self, prometheus::PrometheusError> {
		let metrics = MetricsInner {
			imported_candidates_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_imported_candidates_total",
					"Number of candidates imported by the approval voting subsystem",
				)?,
				registry,
			)?,
			assignments_produced: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_assignments_produced",
						"Assignments and tranches produced by the approval voting subsystem",
					).buckets(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 15.0, 25.0, 40.0, 70.0]),
				)?,
				registry,
			)?,
			approvals_produced_total: prometheus::register(
				prometheus::CounterVec::new(
					prometheus::Opts::new(
						"parachain_approvals_produced_total",
						"Number of approvals produced by the approval voting subsystem",
					),
					&["status"]
				)?,
				registry,
			)?,
			no_shows_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_approvals_no_shows_total",
					"Number of assignments which became no-shows in the approval voting subsystem",
				)?,
				registry,
			)?,
			wakeups_triggered_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_approvals_wakeups_total",
					"Number of times we woke up to process a candidate in the approval voting subsystem",
				)?,
				registry,
			)?,
			candidate_approval_time_ticks: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_approvals_candidate_approval_time_ticks",
						"Number of ticks (500ms) to approve candidates.",
					).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
				)?,
				registry,
			)?,
			block_approval_time_ticks: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_approvals_blockapproval_time_ticks",
						"Number of ticks (500ms) to approve blocks.",
					).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
				)?,
				registry,
			)?,
			time_db_transaction: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_time_approval_db_transaction",
						"Time spent writing an approval db transaction.",
					)
				)?,
				registry,
			)?,
			time_recover_and_approve: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_time_recover_and_approve",
						"Time spent recovering and approving data in approval voting",
					)
				)?,
				registry,
			)?,
		Ok(Metrics(Some(metrics)))
	}
impl ApprovalVotingSubsystem {
	/// Create a new approval voting subsystem with the given keystore, config, and database.
	pub fn with_config(
		config: Config,
		db: Arc<dyn KeyValueDB>,
		keystore: Arc<LocalKeystore>,
		sync_oracle: Box<dyn SyncOracle + Send>,
		metrics: Metrics,
	) -> Self {
		ApprovalVotingSubsystem {
			slot_duration_millis: config.slot_duration_millis,
			db_config: DatabaseConfig {
				col_data: config.col_data,
			},
			mode: Mode::Syncing(sync_oracle),
impl<C> Subsystem<C> for ApprovalVotingSubsystem
	where C: SubsystemContext<Message = ApprovalVotingMessage>
{
	fn start(self, ctx: C) -> SpawnedSubsystem {
		let future = run::<C>(
			ctx,
			self,
			Box::new(SystemClock),
			Box::new(RealAssignmentCriteria),
		)
			.map_err(|e| SubsystemError::with_origin("approval-voting", e))
			.boxed();

		SpawnedSubsystem {
			name: "approval-voting-subsystem",
			future,
		}
	}
}

enum BackgroundRequest {
	ApprovalVote(ApprovalVoteRequest),
	CandidateValidation(
		PersistedValidationData,
		ValidationCode,
		CandidateDescriptor,
		Arc<PoV>,
		oneshot::Sender<Result<ValidationResult, ValidationFailed>>,
	),
}

struct ApprovalVoteRequest {
	validator_index: ValidatorIndex,
	block_hash: Hash,
	candidate_index: usize,
}

#[derive(Default)]
struct Wakeups {
	// Tick -> [(Relay Block, Candidate Hash)]
	wakeups: BTreeMap<Tick, Vec<(Hash, CandidateHash)>>,
	reverse_wakeups: HashMap<(Hash, CandidateHash), Tick>,
	block_numbers: BTreeMap<BlockNumber, HashSet<Hash>>,
}

impl Wakeups {
	// Returns the first tick there exist wakeups for, if any.
	fn first(&self) -> Option<Tick> {
		self.wakeups.keys().next().map(|t| *t)
	}

	fn note_block(&mut self, block_hash: Hash, block_number: BlockNumber) {
		self.block_numbers.entry(block_number).or_default().insert(block_hash);
	}

	// Schedules a wakeup at the given tick. no-op if there is already an earlier or equal wake-up
	// for these values. replaces any later wakeup.
	fn schedule(
		&mut self,
		block_hash: Hash,
		block_number: BlockNumber,
		candidate_hash: CandidateHash,
		tick: Tick,
	) {
		if let Some(prev) = self.reverse_wakeups.get(&(block_hash, candidate_hash)) {
			if prev <= &tick { return }

			// we are replacing previous wakeup with an earlier one.
			if let Entry::Occupied(mut entry) = self.wakeups.entry(*prev) {
				if let Some(pos) = entry.get().iter()
					.position(|x| x == &(block_hash, candidate_hash))
				{
					entry.get_mut().remove(pos);
				}

				if entry.get().is_empty() {
					let _ = entry.remove_entry();
				}
			}
		} else {
			self.note_block(block_hash, block_number);
		}

		self.reverse_wakeups.insert((block_hash, candidate_hash), tick);
		self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash));
	}

	fn prune_finalized_wakeups(&mut self, finalized_number: BlockNumber) {
		let after = self.block_numbers.split_off(&(finalized_number + 1));
		let pruned_blocks: HashSet<_> = std::mem::replace(&mut self.block_numbers, after)
			.into_iter()
			.flat_map(|(_number, hashes)| hashes)
			.collect();

		let mut pruned_wakeups = BTreeMap::new();
		self.reverse_wakeups.retain(|&(ref h, ref c_h), tick| {
			let live = !pruned_blocks.contains(h);
			if !live {
				pruned_wakeups.entry(*tick)
					.or_insert_with(HashSet::new)
					.insert((*h, *c_h));
			}
			live
		});

		for (tick, pruned) in pruned_wakeups {
			if let Entry::Occupied(mut entry) = self.wakeups.entry(tick) {
				entry.get_mut().retain(|wakeup| !pruned.contains(wakeup));
				if entry.get().is_empty() {
					let _ = entry.remove();
				}
			}
		}
	}

	// Get the wakeup for a particular block/candidate combo, if any.
	fn wakeup_for(&self, block_hash: Hash, candidate_hash: CandidateHash) -> Option<Tick> {
		self.reverse_wakeups.get(&(block_hash, candidate_hash)).map(|t| *t)
	}

	// Returns the next wakeup. this future never returns if there are no wakeups.
	async fn next(&mut self, clock: &(dyn Clock + Sync)) -> (Tick, Hash, CandidateHash) {
		match self.first() {
			None => future::pending().await,
			Some(tick) => {
				clock.wait(tick).await;
				match self.wakeups.entry(tick) {
					Entry::Vacant(_) => panic!("entry is known to exist since `first` was `Some`; qed"),
					Entry::Occupied(mut entry) => {
						let (hash, candidate_hash) = entry.get_mut().pop()
							.expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed");

						if entry.get().is_empty() {
							let _ = entry.remove();
						}
						self.reverse_wakeups.remove(&(hash, candidate_hash));

						(tick, hash, candidate_hash)
	}
}

/// A read-only handle to a database.
trait DBReader {
	fn load_block_entry(
		&self,
		block_hash: &Hash,
	) -> SubsystemResult<Option<BlockEntry>>;

	fn load_candidate_entry(
		&self,
		candidate_hash: &CandidateHash,
	) -> SubsystemResult<Option<CandidateEntry>>;

	fn load_all_blocks(&self) -> SubsystemResult<Vec<Hash>>;
}

// This is a submodule to enforce opacity of the inner DB type.
mod approval_db_v1_reader {
	use super::{
		DBReader, KeyValueDB, Hash, CandidateHash, BlockEntry, CandidateEntry,
		SubsystemResult, SubsystemError, DatabaseConfig, approval_db,
	};

	/// A DB reader that uses the approval-db V1 under the hood.
	pub(super) struct ApprovalDBV1Reader<T> {
		inner: T,
		config: DatabaseConfig,
	}
	impl<T> ApprovalDBV1Reader<T> {
		pub(super) fn new(inner: T, config: DatabaseConfig) -> Self {
			ApprovalDBV1Reader {
				inner,
				config,
			}
	impl<'a, T: 'a> DBReader for ApprovalDBV1Reader<T>
		where T: std::ops::Deref<Target=(dyn KeyValueDB + 'a)>
	{
		fn load_block_entry(
			&self,
			block_hash: &Hash,
		) -> SubsystemResult<Option<BlockEntry>> {
			approval_db::v1::load_block_entry(&*self.inner, &self.config, block_hash)
				.map(|e| e.map(Into::into))
				.map_err(|e| SubsystemError::with_origin("approval-voting", e))
		}

		fn load_candidate_entry(
			&self,
			candidate_hash: &CandidateHash,
		) -> SubsystemResult<Option<CandidateEntry>> {
			approval_db::v1::load_candidate_entry(&*self.inner, &self.config, candidate_hash)
				.map(|e| e.map(Into::into))
				.map_err(|e| SubsystemError::with_origin("approval-voting", e))
		}

		fn load_all_blocks(&self) -> SubsystemResult<Vec<Hash>> {
			approval_db::v1::load_all_blocks(&*self.inner, &self.config)
				.map_err(|e| SubsystemError::with_origin("approval-voting", e))
		}
	}
}
use approval_db_v1_reader::ApprovalDBV1Reader;

struct ApprovalStatus {
	required_tranches: RequiredTranches,
	tranche_now: DelayTranche,
	block_tick: Tick,
}

struct State<T> {
	session_window: RollingSessionWindow,
	slot_duration_millis: u64,
	db: T,
	clock: Box<dyn Clock + Send + Sync>,
	assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
}

impl<T> State<T> {
	fn session_info(&self, i: SessionIndex) -> Option<&SessionInfo> {
		self.session_window.session_info(i)
	}

	// Compute the required tranches for approval for this block and candidate combo.
	// Fails if there is no approval entry for the block under the candidate or no candidate entry
	// under the block, or if the session is out of bounds.
	fn approval_status<'a, 'b>(
		&'a self,
		block_entry: &'a BlockEntry,
		candidate_entry: &'b CandidateEntry,
	) -> Option<(&'b ApprovalEntry, ApprovalStatus)> {
		let session_info = match self.session_info(block_entry.session()) {
			Some(s) => s,
			None => {
				tracing::warn!(target: LOG_TARGET, "Unknown session info for {}", block_entry.session());
				return None;
			}
		};
		let block_hash = block_entry.block_hash();

		let tranche_now = self.clock.tranche_now(self.slot_duration_millis, block_entry.slot());
		let block_tick = slot_number_to_tick(self.slot_duration_millis, block_entry.slot());
		let no_show_duration = slot_number_to_tick(
			self.slot_duration_millis,
			Slot::from(u64::from(session_info.no_show_slots)),
		);

		if let Some(approval_entry) = candidate_entry.approval_entry(&block_hash) {
			let required_tranches = approval_checking::tranches_to_approve(
				approval_entry,
				candidate_entry.approvals(),
				tranche_now,
				block_tick,
				no_show_duration,
				session_info.needed_approvals as _
			);

			let status = ApprovalStatus {
				required_tranches,
				block_tick,
				tranche_now,
			};

			Some((approval_entry, status))
		} else {
			None
		}
	}
}

#[derive(Debug)]
enum Action {
	ScheduleWakeup {
		block_hash: Hash,
		block_number: BlockNumber,
		candidate_hash: CandidateHash,
		tick: Tick,
	},
	WriteBlockEntry(BlockEntry),
	WriteCandidateEntry(CandidateHash, CandidateEntry),
	LaunchApproval {
		indirect_cert: IndirectAssignmentCert,
		assignment_tranche: DelayTranche,
		candidate_index: CandidateIndex,
		session: SessionIndex,
		candidate: CandidateReceipt,
	BecomeActive,
type BackgroundTaskMap = BTreeMap<BlockNumber, Vec<RemoteHandle<()>>>;

	mut subsystem: ApprovalVotingSubsystem,
	clock: Box<dyn Clock + Send + Sync>,
	assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
) -> SubsystemResult<()>
	where C: SubsystemContext<Message = ApprovalVotingMessage>
{
	let (background_tx, background_rx) = mpsc::channel::<BackgroundRequest>(64);
	let mut state = State {
		session_window: RollingSessionWindow::new(APPROVAL_SESSIONS),
		keystore: subsystem.keystore,
		slot_duration_millis: subsystem.slot_duration_millis,
		db: ApprovalDBV1Reader::new(subsystem.db.clone(), subsystem.db_config.clone()),
		clock,
		assignment_criteria,
	};

	let mut wakeups = Wakeups::default();

	// map block numbers to background work.
	let mut background_tasks = BTreeMap::new();

	let mut last_finalized_height: Option<BlockNumber> = None;
	let mut background_rx = background_rx.fuse();

	let db_writer = &*subsystem.db;

	loop {
		let actions = futures::select! {
			(tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => {
				subsystem.metrics.on_wakeup();
				process_wakeup(
					&mut state,
					woken_block,
					woken_candidate,
			}
			next_msg = ctx.recv().fuse() => {
				let mut actions = handle_from_overseer(
					&subsystem.metrics,
					subsystem.db_config,
					next_msg?,
					&mut last_finalized_height,
					&mut wakeups,
				).await?;

				if let Some(finalized_height) = last_finalized_height {
					cleanup_background_tasks(finalized_height, &mut background_tasks);
				}

				if let Mode::Syncing(ref mut oracle) = subsystem.mode {
					if !oracle.is_major_syncing() {
						// note that we're active before processing other actions.
						actions.insert(0, Action::BecomeActive)
					}
				}

			}
			background_request = background_rx.next().fuse() => {
				if let Some(req) = background_request {
					handle_background_request(
						&mut ctx,
						&mut state,
						&subsystem.metrics,
						req,
					).await?
				} else {
					Vec::new()
				}
			}
		};

		if handle_actions(
			&mut ctx,
			&subsystem.metrics,
			&mut wakeups,
			db_writer,
			subsystem.db_config,
			&mut subsystem.mode,
			actions,
		).await? {
			break;
		}
	}

	Ok(())
}

// returns `true` if any of the actions was a `Conclude` command.
async fn handle_actions(
	ctx: &mut impl SubsystemContext,
	metrics: &Metrics,
	wakeups: &mut Wakeups,
	db: &dyn KeyValueDB,
	db_config: DatabaseConfig,
	background_tx: &mpsc::Sender<BackgroundRequest>,
	background_tasks: &mut BackgroundTaskMap,
	mode: &mut Mode,
	actions: impl IntoIterator<Item = Action>,
) -> SubsystemResult<bool> {
	let mut transaction = approval_db::v1::Transaction::new(db_config);
	let mut conclude = false;

	for action in actions {
		match action {
			Action::ScheduleWakeup {
				block_hash,
				block_number,
			} => {
				wakeups.schedule(block_hash, block_number, candidate_hash, tick)
			}
			Action::WriteBlockEntry(block_entry) => {
				transaction.put_block_entry(block_entry.into());
			}
			Action::WriteCandidateEntry(candidate_hash, candidate_entry) => {
				transaction.put_candidate_entry(candidate_hash, candidate_entry.into());
			}
			Action::LaunchApproval {
				indirect_cert,
				assignment_tranche,
				candidate_index,
				session,
				candidate,
				// Don't launch approval work if the node is syncing.
				if let Mode::Syncing(_) = *mode { continue }

				metrics.on_assignment_produced(assignment_tranche);
				let block_hash = indirect_cert.block_hash;
				let validator_index = indirect_cert.validator;

				ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment(
					indirect_cert,
					candidate_index,
					background_tx.clone(),
					session,
					&candidate,
					validator_index,
					block_hash,
					candidate_index as _,
				).await?;

				if let Some(handle) = handle {
					background_tasks.entry(relay_block_number).or_default().push(handle);
				}
			Action::BecomeActive => {
				*mode = Mode::Active;

				let messages = distribution_messages_for_activation(
					ApprovalDBV1Reader::new(db, db_config)
				)?;

				ctx.send_messages(messages.into_iter().map(Into::into)).await;
			}
			Action::Conclude => { conclude = true; }
		}
	}

	if !transaction.is_empty() {
		let _timer = metrics.time_db_transaction();

		transaction.write(db)
			.map_err(|e| SubsystemError::with_origin("approval-voting", e))?;
	}
// Clean up all background tasks which are no longer needed as they correspond to a
// finalized block.
fn cleanup_background_tasks(
	current_finalized_block: BlockNumber,
	tasks: &mut BackgroundTaskMap,
) {
	let after = tasks.split_off(&(current_finalized_block + 1));
	*tasks = after;

	// tasks up to the finalized block are dropped, and `RemoteHandle` cancels
	// the task on drop.
}

fn distribution_messages_for_activation<'a>(
	db: impl DBReader + 'a,
) -> SubsystemResult<Vec<ApprovalDistributionMessage>> {
	let all_blocks = db.load_all_blocks()?;

	let mut approval_meta = Vec::with_capacity(all_blocks.len());
	let mut messages = Vec::new();

	messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); // dummy value.

	for block_hash in all_blocks {
		let block_entry = match db.load_block_entry(&block_hash)? {
			Some(b) => b,
			None => {
				tracing::warn!(
					target: LOG_TARGET,
					?block_hash,
					"Missing block entry",
				);

				continue
			}
		};
		approval_meta.push(BlockApprovalMeta {
			hash: block_hash,
			number: block_entry.block_number(),
			parent_hash: block_entry.parent_hash(),
			candidates: block_entry.candidates().iter().map(|(_, c_hash)| *c_hash).collect(),
			slot: block_entry.slot(),
		});

		for (i, (_, candidate_hash)) in block_entry.candidates().iter().enumerate() {
			let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
				Some(c) => c,
				None => {
					tracing::warn!(
						target: LOG_TARGET,
						?block_hash,
						?candidate_hash,
						"Missing candidate entry",
					);

					continue
				}
			};

			match candidate_entry.approval_entry(&block_hash) {
				Some(approval_entry) => {
					match approval_entry.local_statements() {
						(None, None) | (None, Some(_)) => {}, // second is impossible case.
						(Some(assignment), None) => {
							messages.push(ApprovalDistributionMessage::DistributeAssignment(
								IndirectAssignmentCert {
									block_hash,
									validator: assignment.validator_index(),
									cert: assignment.cert().clone(),
								},
								i as _,
							));
						}
						(Some(assignment), Some(approval_sig)) => {
							messages.push(ApprovalDistributionMessage::DistributeAssignment(
								IndirectAssignmentCert {
									block_hash,
									validator: assignment.validator_index(),
									cert: assignment.cert().clone(),
								},
								i as _,
							));

							messages.push(ApprovalDistributionMessage::DistributeApproval(
								IndirectSignedApprovalVote {
									block_hash,
									candidate_index: i as _,
									validator: assignment.validator_index(),
									signature: approval_sig,
								}
							))
						}
					}
				}
				None => {
					tracing::warn!(
						target: LOG_TARGET,
						?block_hash,
						?candidate_hash,
						"Missing approval entry",
					);
				}
			}
		}
	}

	messages[0] = ApprovalDistributionMessage::NewBlocks(approval_meta);
	Ok(messages)
}

// Handle an incoming signal from the overseer. Returns true if execution should conclude.
async fn handle_from_overseer(
	ctx: &mut impl SubsystemContext,
	state: &mut State<impl DBReader>,
	metrics: &Metrics,
	db_writer: &dyn KeyValueDB,
	db_config: DatabaseConfig,
	x: FromOverseer<ApprovalVotingMessage>,
	last_finalized_height: &mut Option<BlockNumber>,
	wakeups: &mut Wakeups,
) -> SubsystemResult<Vec<Action>> {

	let actions = match x {
		FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
			let mut actions = Vec::new();

			for activated in update.activated {
				let head = activated.hash;
				match import::handle_new_head(
					ctx,
					state,
					db_writer,
					head,
					&*last_finalized_height,
				).await {
					Err(e) => return Err(SubsystemError::with_origin("db", e)),
					Ok(block_imported_candidates) => {
						// Schedule wakeups for all imported candidates.
						for block_batch in block_imported_candidates {
							tracing::debug!(
								target: LOG_TARGET,
								block_hash = ?block_batch.block_hash,
								num_candidates = block_batch.imported_candidates.len(),
								"Imported new block.",
							for (c_hash, c_entry) in block_batch.imported_candidates {
								metrics.on_candidate_imported();

								let our_tranche = c_entry
									.approval_entry(&block_batch.block_hash)
									.and_then(|a| a.our_assignment().map(|a| a.tranche()));

								if let Some(our_tranche) = our_tranche {
									let tick = our_tranche as Tick + block_batch.block_tick;
									tracing::trace!(
										target: LOG_TARGET,
										tranche = our_tranche,
										candidate_hash = ?c_hash,
										block_hash = ?block_batch.block_hash,
										block_tick = block_batch.block_tick,
										"Scheduling first wakeup.",
									// Our first wakeup will just be the tranche of our assignment,
									// if any. This will likely be superseded by incoming assignments
									// and approvals which trigger rescheduling.
									actions.push(Action::ScheduleWakeup {
										block_hash: block_batch.block_hash,
										block_number: block_batch.block_number,
										candidate_hash: c_hash,
									});
								}
							}
						}
					}
				}
			}

			actions
		}
		FromOverseer::Signal(OverseerSignal::BlockFinalized(block_hash, block_number)) => {
			*last_finalized_height = Some(block_number);