lib.rs 48.8 KB
Newer Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! The Approval Voting Subsystem.
//!
//! This subsystem is responsible for determining candidates to do approval checks
//! on, performing those approval checks, and tracking the assignments and approvals
//! of others. It uses this information to determine when candidates and blocks have
//! been sufficiently approved to finalize.

use polkadot_node_subsystem::{
	messages::{
		AssignmentCheckResult, ApprovalCheckResult, ApprovalVotingMessage,
		RuntimeApiMessage, RuntimeApiRequest, ChainApiMessage, ApprovalDistributionMessage,
		ValidationFailed, CandidateValidationMessage, AvailabilityRecoveryMessage,
	},
	errors::RecoveryError,
	Subsystem, SubsystemContext, SubsystemError, SubsystemResult, SpawnedSubsystem,
	FromOverseer, OverseerSignal,
};
use polkadot_node_subsystem_util::{
	metrics::{self, prometheus},
};
use polkadot_primitives::v1::{
	ValidatorIndex, Hash, SessionIndex, SessionInfo, CandidateHash,
	CandidateReceipt, BlockNumber, PersistedValidationData,
	ValidationCode, CandidateDescriptor, PoV, ValidatorPair, ValidatorSignature, ValidatorId,
};
use polkadot_node_primitives::ValidationResult;
use polkadot_node_primitives::approval::{
	IndirectAssignmentCert, IndirectSignedApprovalVote, ApprovalVote, DelayTranche,
};
use polkadot_node_jaeger as jaeger;
use parity_scale_codec::Encode;
use sc_keystore::LocalKeystore;
use sp_consensus_slots::Slot;
use sp_runtime::traits::AppVerify;
use sp_application_crypto::Pair;
use kvdb::KeyValueDB;
use futures::prelude::*;
use futures::channel::{mpsc, oneshot};

use std::collections::{BTreeMap, HashMap};
use std::collections::btree_map::Entry;
use std::sync::Arc;

use approval_checking::RequiredTranches;
use persisted_entries::{ApprovalEntry, CandidateEntry, BlockEntry};
use criteria::{AssignmentCriteria, RealAssignmentCriteria};
use time::{slot_number_to_tick, Tick, Clock, ClockExt, SystemClock};

mod approval_checking;
mod approval_db;
mod criteria;
mod import;
mod time;
mod persisted_entries;

#[cfg(test)]
mod tests;

const APPROVAL_SESSIONS: SessionIndex = 6;
const LOG_TARGET: &str = "parachain::approval-voting";
/// Configuration for the approval voting subsystem
pub struct Config {
	/// The path where the approval-voting DB should be kept. This directory is completely removed when starting
	/// the service.
	pub path: std::path::PathBuf,
	/// The cache size, in bytes, to spend on approval checking metadata.
	pub cache_size: Option<usize>,
	/// The slot duration of the consensus algorithm, in milliseconds. Should be evenly
	/// divisible by 500.
	pub slot_duration_millis: u64,
}

/// The approval voting subsystem.
pub struct ApprovalVotingSubsystem {
	/// LocalKeystore is needed for assignment keys, but not necessarily approval keys.
	///
	/// We do a lot of VRF signing and need the keys to have low latency.
	slot_duration_millis: u64,
	db: Arc<dyn KeyValueDB>,
	metrics: Metrics,
}

#[derive(Clone)]
struct MetricsInner {
	imported_candidates_total: prometheus::Counter<prometheus::U64>,
	assignments_produced_total: prometheus::Counter<prometheus::U64>,
	approvals_produced_total: prometheus::Counter<prometheus::U64>,
	no_shows_total: prometheus::Counter<prometheus::U64>,
	wakeups_triggered_total: prometheus::Counter<prometheus::U64>,
	candidate_approval_time_ticks: prometheus::Histogram,
	block_approval_time_ticks: prometheus::Histogram,
}

/// Aproval Voting metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);

impl Metrics {
	fn on_candidate_imported(&self) {
		if let Some(metrics) = &self.0 {
			metrics.imported_candidates_total.inc();
		}
	}

	fn on_assignment_produced(&self) {
		if let Some(metrics) = &self.0 {
			metrics.assignments_produced_total.inc();
		}
	}

	fn on_approval_produced(&self) {
		if let Some(metrics) = &self.0 {
			metrics.approvals_produced_total.inc();
		}
	}

	fn on_no_shows(&self, n: usize) {
		if let Some(metrics) = &self.0 {
			metrics.no_shows_total.inc_by(n as u64);
		}
	}

	fn on_wakeup(&self) {
		if let Some(metrics) = &self.0 {
			metrics.wakeups_triggered_total.inc();
		}
	}

	fn on_candidate_approved(&self, ticks: Tick) {
		if let Some(metrics) = &self.0 {
			metrics.candidate_approval_time_ticks.observe(ticks as f64);
		}
	}

	fn on_block_approved(&self, ticks: Tick) {
		if let Some(metrics) = &self.0 {
			metrics.block_approval_time_ticks.observe(ticks as f64);
		}
	}
}

impl metrics::Metrics for Metrics {
	fn try_register(
		registry: &prometheus::Registry,
	) -> std::result::Result<Self, prometheus::PrometheusError> {
		let metrics = MetricsInner {
			imported_candidates_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_imported_candidates_total",
					"Number of candidates imported by the approval voting subsystem",
				)?,
				registry,
			)?,
			assignments_produced_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_assignments_produced_total",
					"Number of assignments produced by the approval voting subsystem",
				)?,
				registry,
			)?,
			approvals_produced_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_approvals_produced_total",
					"Number of approvals produced by the approval voting subsystem",
				)?,
				registry,
			)?,
			no_shows_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_approvals_no_shows_total",
					"Number of assignments which became no-shows in the approval voting subsystem",
				)?,
				registry,
			)?,
			wakeups_triggered_total: prometheus::register(
				prometheus::Counter::new(
					"parachain_approvals_wakeups_total",
					"Number of times we woke up to process a candidate in the approval voting subsystem",
				)?,
				registry,
			)?,
			candidate_approval_time_ticks: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_approvals_candidate_approval_time_ticks",
						"Number of ticks (500ms) to approve candidates.",
					).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
				)?,
				registry,
			)?,
			block_approval_time_ticks: prometheus::register(
				prometheus::Histogram::with_opts(
					prometheus::HistogramOpts::new(
						"parachain_approvals_blockapproval_time_ticks",
						"Number of ticks (500ms) to approve blocks.",
					).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
				)?,
				registry,
			)?,
		Ok(Metrics(Some(metrics)))
	}
impl ApprovalVotingSubsystem {
	/// Create a new approval voting subsystem with the given keystore and config,
	/// which creates a DB at the given path. This function will delete the directory
	/// at the given path if it already exists.
	pub fn with_config(
		config: Config,
		keystore: Arc<LocalKeystore>,
		metrics: Metrics,
	) -> std::io::Result<Self> {
		const DEFAULT_CACHE_SIZE: usize = 100 * 1024 * 1024; // 100MiB default should be fine unless finality stalls.

		let db = approval_db::v1::clear_and_recreate(
			&config.path,
			config.cache_size.unwrap_or(DEFAULT_CACHE_SIZE),
		)?;

		Ok(ApprovalVotingSubsystem {
			slot_duration_millis: config.slot_duration_millis,
impl<C> Subsystem<C> for ApprovalVotingSubsystem
	where C: SubsystemContext<Message = ApprovalVotingMessage>
{
	fn start(self, ctx: C) -> SpawnedSubsystem {
		let future = run::<C>(
			ctx,
			self,
			Box::new(SystemClock),
			Box::new(RealAssignmentCriteria),
		)
			.map_err(|e| SubsystemError::with_origin("approval-voting", e))
			.boxed();

		SpawnedSubsystem {
			name: "approval-voting-subsystem",
			future,
		}
	}
}

enum BackgroundRequest {
	ApprovalVote(ApprovalVoteRequest),
	CandidateValidation(
		PersistedValidationData,
		ValidationCode,
		CandidateDescriptor,
		Arc<PoV>,
		oneshot::Sender<Result<ValidationResult, ValidationFailed>>,
	),
}

struct ApprovalVoteRequest {
	validator_index: ValidatorIndex,
	block_hash: Hash,
	candidate_index: usize,
}

#[derive(Default)]
struct Wakeups {
	// Tick -> [(Relay Block, Candidate Hash)]
	wakeups: BTreeMap<Tick, Vec<(Hash, CandidateHash)>>,
	reverse_wakeups: HashMap<(Hash, CandidateHash), Tick>,
}

impl Wakeups {
	// Returns the first tick there exist wakeups for, if any.
	fn first(&self) -> Option<Tick> {
		self.wakeups.keys().next().map(|t| *t)
	}

	// Schedules a wakeup at the given tick. no-op if there is already an earlier or equal wake-up
	// for these values. replaces any later wakeup.
	fn schedule(&mut self, block_hash: Hash, candidate_hash: CandidateHash, tick: Tick) {
		if let Some(prev) = self.reverse_wakeups.get(&(block_hash, candidate_hash)) {
			if prev <= &tick { return }

			// we are replacing previous wakeup with an earlier one.
			if let Entry::Occupied(mut entry) = self.wakeups.entry(*prev) {
				if let Some(pos) = entry.get().iter()
					.position(|x| x == &(block_hash, candidate_hash))
				{
					entry.get_mut().remove(pos);
				}

				if entry.get().is_empty() {
					let _ = entry.remove_entry();
				}
			}
		}

		self.reverse_wakeups.insert((block_hash, candidate_hash), tick);
		self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash));
	}

	// Get the wakeup for a particular block/candidate combo, if any.
	fn wakeup_for(&self, block_hash: Hash, candidate_hash: CandidateHash) -> Option<Tick> {
		self.reverse_wakeups.get(&(block_hash, candidate_hash)).map(|t| *t)
	}

	// Returns the next wakeup. this future never returns if there are no wakeups.
	async fn next(&mut self, clock: &(dyn Clock + Sync)) -> (Tick, Hash, CandidateHash) {
		match self.first() {
			None => future::pending().await,
			Some(tick) => {
				clock.wait(tick).await;
				match self.wakeups.entry(tick) {
					Entry::Vacant(_) => panic!("entry is known to exist since `first` was `Some`; qed"),
					Entry::Occupied(mut entry) => {
						let (hash, candidate_hash) = entry.get_mut().pop()
							.expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed");

						if entry.get().is_empty() {
							let _ = entry.remove();
						}
						self.reverse_wakeups.remove(&(hash, candidate_hash));

						(tick, hash, candidate_hash)
	}
}

/// A read-only handle to a database.
trait DBReader {
	fn load_block_entry(
		&self,
		block_hash: &Hash,
	) -> SubsystemResult<Option<BlockEntry>>;

	fn load_candidate_entry(
		&self,
		candidate_hash: &CandidateHash,
	) -> SubsystemResult<Option<CandidateEntry>>;
}

// This is a submodule to enforce opacity of the inner DB type.
mod approval_db_v1_reader {
	use super::{
		DBReader, KeyValueDB, Hash, CandidateHash, BlockEntry, CandidateEntry,
		Arc, SubsystemResult, SubsystemError, approval_db,
	};

	/// A DB reader that uses the approval-db V1 under the hood.
	pub(super) struct ApprovalDBV1Reader<T: ?Sized>(Arc<T>);
	impl<T: ?Sized> From<Arc<T>> for ApprovalDBV1Reader<T> {
		fn from(a: Arc<T>) -> Self {
			ApprovalDBV1Reader(a)
		}
	}

	impl DBReader for ApprovalDBV1Reader<dyn KeyValueDB> {
		fn load_block_entry(
			&self,
			block_hash: &Hash,
		) -> SubsystemResult<Option<BlockEntry>> {
			approval_db::v1::load_block_entry(&*self.0, block_hash)
				.map(|e| e.map(Into::into))
				.map_err(|e| SubsystemError::with_origin("approval-voting", e))
		}

		fn load_candidate_entry(
			&self,
			candidate_hash: &CandidateHash,
		) -> SubsystemResult<Option<CandidateEntry>> {
			approval_db::v1::load_candidate_entry(&*self.0, candidate_hash)
				.map(|e| e.map(Into::into))
				.map_err(|e| SubsystemError::with_origin("approval-voting", e))
		}
	}
}
use approval_db_v1_reader::ApprovalDBV1Reader;

struct State<T> {
	session_window: import::RollingSessionWindow,
	slot_duration_millis: u64,
	db: T,
	clock: Box<dyn Clock + Send + Sync>,
	assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
}

impl<T> State<T> {
	fn session_info(&self, i: SessionIndex) -> Option<&SessionInfo> {
		self.session_window.session_info(i)
	}
}

#[derive(Debug)]
enum Action {
	ScheduleWakeup {
		block_hash: Hash,
		candidate_hash: CandidateHash,
		tick: Tick,
	},
	WriteBlockEntry(BlockEntry),
	WriteCandidateEntry(CandidateHash, CandidateEntry),
	LaunchApproval {
		indirect_cert: IndirectAssignmentCert,
		candidate_index: CandidateIndex,
		session: SessionIndex,
		candidate: CandidateReceipt,
	subsystem: ApprovalVotingSubsystem,
	clock: Box<dyn Clock + Send + Sync>,
	assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
) -> SubsystemResult<()>
	where C: SubsystemContext<Message = ApprovalVotingMessage>
{
	let (background_tx, background_rx) = mpsc::channel::<BackgroundRequest>(64);
	let mut state = State {
		session_window: Default::default(),
		keystore: subsystem.keystore,
		slot_duration_millis: subsystem.slot_duration_millis,
		db: ApprovalDBV1Reader::from(subsystem.db.clone()),
		clock,
		assignment_criteria,
	};

	let mut wakeups = Wakeups::default();

	let mut last_finalized_height: Option<BlockNumber> = None;
	let mut background_rx = background_rx.fuse();

	let db_writer = &*subsystem.db;

	loop {
		let actions = futures::select! {
			(tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => {
				subsystem.metrics.on_wakeup();
				process_wakeup(
					&mut state,
					woken_block,
					woken_candidate,
			}
			next_msg = ctx.recv().fuse() => {
				handle_from_overseer(
					&mut ctx,
					&mut state,
					&subsystem.metrics,
					db_writer,
					next_msg?,
					&mut last_finalized_height,
				).await?
			}
			background_request = background_rx.next().fuse() => {
				if let Some(req) = background_request {
					handle_background_request(
						&mut ctx,
						&mut state,
						&subsystem.metrics,
						req,
					).await?
				} else {
					Vec::new()
				}
			}
		};

		if handle_actions(
			&mut ctx,
			&subsystem.metrics,
			&mut wakeups,
			db_writer,
			&background_tx,
			actions,
		).await? {
			break;
		}
	}

	Ok(())
}

// returns `true` if any of the actions was a `Conclude` command.
async fn handle_actions(
	ctx: &mut impl SubsystemContext,
	metrics: &Metrics,
	wakeups: &mut Wakeups,
	db: &dyn KeyValueDB,
	background_tx: &mpsc::Sender<BackgroundRequest>,
	actions: impl IntoIterator<Item = Action>,
) -> SubsystemResult<bool> {
	let mut transaction = approval_db::v1::Transaction::default();
	let mut conclude = false;

	for action in actions {
		match action {
			Action::ScheduleWakeup {
				block_hash,
				candidate_hash,
				tick,
			} => wakeups.schedule(block_hash, candidate_hash, tick),
			Action::WriteBlockEntry(block_entry) => {
				transaction.put_block_entry(block_entry.into());
			}
			Action::WriteCandidateEntry(candidate_hash, candidate_entry) => {
				transaction.put_candidate_entry(candidate_hash, candidate_entry.into());
			}
			Action::LaunchApproval {
				indirect_cert,
				candidate_index,
				session,
				candidate,
				metrics.on_assignment_produced();
				let block_hash = indirect_cert.block_hash;
				let validator_index = indirect_cert.validator;

				ctx.send_message(ApprovalDistributionMessage::DistributeAssignment(
					indirect_cert,
					candidate_index,
				).into()).await;

				launch_approval(
					ctx,
					background_tx.clone(),
					session,
					&candidate,
					validator_index,
					block_hash,
					candidate_index as _,
				).await?
			}
			Action::Conclude => { conclude = true; }
		}
	}

	transaction.write(db)
		.map_err(|e| SubsystemError::with_origin("approval-voting", e))?;

	Ok(conclude)
}

// Handle an incoming signal from the overseer. Returns true if execution should conclude.
async fn handle_from_overseer(
	ctx: &mut impl SubsystemContext,
	state: &mut State<impl DBReader>,
	metrics: &Metrics,
	db_writer: &dyn KeyValueDB,
	x: FromOverseer<ApprovalVotingMessage>,
	last_finalized_height: &mut Option<BlockNumber>,
	wakeups: &Wakeups,
) -> SubsystemResult<Vec<Action>> {

	let actions = match x {
		FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
			let mut actions = Vec::new();

			for activated in update.activated {
				let head = activated.hash;
				match import::handle_new_head(
					ctx,
					state,
					db_writer,
					head,
					&*last_finalized_height,
				).await {
					Err(e) => return Err(SubsystemError::with_origin("db", e)),
					Ok(block_imported_candidates) => {
						// Schedule wakeups for all imported candidates.
						for block_batch in block_imported_candidates {
							tracing::debug!(
								target: LOG_TARGET,
								block_hash = ?block_batch.block_hash,
								num_candidates = block_batch.imported_candidates.len(),
								"Imported new block.",
							for (c_hash, c_entry) in block_batch.imported_candidates {
								metrics.on_candidate_imported();

								let our_tranche = c_entry
									.approval_entry(&block_batch.block_hash)
									.and_then(|a| a.our_assignment().map(|a| a.tranche()));

								if let Some(our_tranche) = our_tranche {
									let tick = our_tranche as Tick + block_batch.block_tick;
									tracing::trace!(
										target: LOG_TARGET,
										tranche = our_tranche,
										candidate_hash = ?c_hash,
										block_hash = ?block_batch.block_hash,
										block_tick = block_batch.block_tick,
										"Scheduling first wakeup.",
									// Our first wakeup will just be the tranche of our assignment,
									// if any. This will likely be superseded by incoming assignments
									// and approvals which trigger rescheduling.
									actions.push(Action::ScheduleWakeup {
										block_hash: block_batch.block_hash,
										candidate_hash: c_hash,
									});
								}
							}
						}
					}
				}
			}

			actions
		}
		FromOverseer::Signal(OverseerSignal::BlockFinalized(block_hash, block_number)) => {
			*last_finalized_height = Some(block_number);

			approval_db::v1::canonicalize(db_writer, block_number, block_hash)
				.map_err(|e| SubsystemError::with_origin("db", e))?;

			Vec::new()
		}
		FromOverseer::Signal(OverseerSignal::Conclude) => {
			vec![Action::Conclude]
		}
		FromOverseer::Communication { msg } => match msg {
			ApprovalVotingMessage::CheckAndImportAssignment(a, claimed_core, res) => {
				let (check_outcome, actions)
					= check_and_import_assignment(state, metrics, a, claimed_core)?;
				let _ = res.send(check_outcome);
				actions
			}
			ApprovalVotingMessage::CheckAndImportApproval(a, res) => {
				check_and_import_approval(state, metrics, a, |r| { let _ = res.send(r); })?.0
			}
			ApprovalVotingMessage::ApprovedAncestor(target, lower_bound, res ) => {
				match handle_approved_ancestor(ctx, &state.db, target, lower_bound, wakeups).await {
					Ok(v) => {
						let _ = res.send(v);
					}
					Err(e) => {
						let _ = res.send(None);
						return Err(e);
					}
				}

				Vec::new()
			}
		}
	};

	Ok(actions)
}

async fn handle_background_request(
	ctx: &mut impl SubsystemContext,
	state: &State<impl DBReader>,
	metrics: &Metrics,
	request: BackgroundRequest,
) -> SubsystemResult<Vec<Action>> {
	match request {
		BackgroundRequest::ApprovalVote(vote_request) => {
			issue_approval(ctx, state, metrics, vote_request).await
		}
		BackgroundRequest::CandidateValidation(
			validation_data,
			validation_code,
			descriptor,
			pov,
			tx,
		) => {
			ctx.send_message(CandidateValidationMessage::ValidateFromExhaustive(
				validation_data,
				validation_code,
				descriptor,
				pov,
				tx,
			).into()).await;

			Ok(Vec::new())
		}
	}
}

async fn handle_approved_ancestor(
	ctx: &mut impl SubsystemContext,
	db: &impl DBReader,
	target: Hash,
	lower_bound: BlockNumber,
	wakeups: &Wakeups,
) -> SubsystemResult<Option<(Hash, BlockNumber)>> {
	const MAX_TRACING_WINDOW: usize = 200;
	const ABNORMAL_DEPTH_THRESHOLD: usize = 5;

	use bitvec::{order::Lsb0, vec::BitVec};

	let mut span = jaeger::Span::new(&target, "approved-ancestor")
		.with_stage(jaeger::Stage::ApprovalChecking);
	let mut all_approved_max = None;

		let (tx, rx) = oneshot::channel();

		ctx.send_message(ChainApiMessage::BlockNumber(target, tx).into()).await;

		match rx.await {
			Ok(Ok(Some(n))) => n,
			Ok(Ok(None)) => return Ok(None),
			Ok(Err(_)) | Err(_)  => return Ok(None),
	if target_number <= lower_bound { return Ok(None) }
	span.add_string_fmt_debug_tag("target-number", target_number);
	span.add_string_fmt_debug_tag("target-hash", target);
	// request ancestors up to but not including the lower bound,
	// as a vote on the lower bound is implied if we cannot find
	// anything else.
	let ancestry = if target_number > lower_bound + 1 {
		let (tx, rx) = oneshot::channel();

		ctx.send_message(ChainApiMessage::Ancestors {
			hash: target,
			k: (target_number - (lower_bound + 1)) as usize,
			response_channel: tx,
		}.into()).await;

		match rx.await {
			Ok(Ok(a)) => a,
			Err(_) | Ok(Err(_)) => return Ok(None),
	let mut bits: BitVec<Lsb0, u8> = Default::default();
	for (i, block_hash) in std::iter::once(target).chain(ancestry).enumerate() {
		// Block entries should be present as the assumption is that
		// nothing here is finalized. If we encounter any missing block
		// entries we can fail.
		let entry = match db.load_block_entry(&block_hash)? {
			None => {
				tracing::trace!{
					target: LOG_TARGET,
					"Chain between ({}, {}) and {} not fully known. Forcing vote on {}",
					target,
					target_number,
					lower_bound,
					lower_bound,
				}
				return Ok(None);
			}
		// even if traversing millions of blocks this is fairly cheap and always dwarfed by the
		// disk lookups.
		bits.push(entry.is_fully_approved());
		if entry.is_fully_approved() {
			if all_approved_max.is_none() {
				// First iteration of the loop is target, i = 0. After that,
				// ancestry is moving backwards.
				all_approved_max = Some((block_hash, target_number - i as BlockNumber));
		} else if bits.len() <= ABNORMAL_DEPTH_THRESHOLD {
			all_approved_max = None;
		} else {
			all_approved_max = None;

			let unapproved: Vec<_> = entry.unapproved_candidates().collect();
			tracing::debug!(
				target: LOG_TARGET,
				"Block {} is {} blocks deep and has {}/{} candidates unapproved",
				block_hash,
				bits.len() - 1,
				unapproved.len(),
				entry.candidates().len(),
			);

			for candidate_hash in unapproved {
				match db.load_candidate_entry(&candidate_hash)? {
					None => {
						tracing::warn!(
							target: LOG_TARGET,
							?candidate_hash,
							"Missing expected candidate in DB",
						);

						continue;
					}
					Some(c_entry) => {
						match c_entry.approval_entry(&block_hash) {
							None => {
								tracing::warn!(
									target: LOG_TARGET,
									?candidate_hash,
									?block_hash,
									"Missing expected approval entry under candidate.",
								);
							}
							Some(a_entry) => {
								let n_assignments = a_entry.n_assignments();
								let n_approvals = c_entry.approvals().count_ones();

								let status = || format!("{}/{}/{}",
									n_assignments,
									n_approvals,
									a_entry.n_validators(),
								);

								match a_entry.our_assignment() {
									None => tracing::debug!(
										target: LOG_TARGET,
										?candidate_hash,
										?block_hash,
										status = %status(),
										"no assignment."
									),
									Some(a) => {
										let tranche = a.tranche();
										let triggered = a.triggered();

										let next_wakeup = wakeups.wakeup_for(
											block_hash,
											candidate_hash,
										);

										tracing::debug!(
											target: LOG_TARGET,
											?candidate_hash,
											?block_hash,
											tranche,
											?next_wakeup,
											status = %status(),
	tracing::trace!(
		target: LOG_TARGET,
		"approved blocks {}-[{}]-{}",
		target_number,
		{
			// formatting to divide bits by groups of 10.
			// when comparing logs on multiple machines where the exact vote
			// targets may differ, this grouping is useful.
			let mut s = String::with_capacity(bits.len());
			for (i, bit) in bits.iter().enumerate().take(MAX_TRACING_WINDOW) {
				s.push(if *bit { '1' } else { '0' });
				if (target_number - i as u32) % 10 == 0 && i != bits.len() - 1 { s.push(' '); }
			}

			s
		},
		if bits.len() > MAX_TRACING_WINDOW {
			format!(
				"{}... (truncated due to large window)",
				target_number - MAX_TRACING_WINDOW as u32 + 1,
			)
		} else {
			format!("{}", lower_bound + 1)
		},
	);

	match all_approved_max {
		Some((ref hash, ref number)) => {
			span.add_uint_tag("approved-number", *number as u64);
			span.add_string_fmt_debug_tag("approved-hash", hash);
		}
		None => {
			span.add_string_tag("reached-lower-bound", "true");
		}
	}

	Ok(all_approved_max)
}

fn approval_signing_payload(
	approval_vote: ApprovalVote,
	session_index: SessionIndex,
) -> Vec<u8> {
	const MAGIC: [u8; 4] = *b"APPR";

	(MAGIC, approval_vote, session_index).encode()
}

// `Option::cmp` treats `None` as less than `Some`.
fn min_prefer_some<T: std::cmp::Ord>(
	a: Option<T>,
	b: Option<T>,
) -> Option<T> {
	match (a, b) {
		(None, None) => None,
		(None, Some(x)) | (Some(x), None) => Some(x),
		(Some(x), Some(y)) => Some(std::cmp::min(x, y)),
	}
}

fn schedule_wakeup_action(
	approval_entry: &ApprovalEntry,
	block_hash: Hash,
	candidate_hash: CandidateHash,
	block_tick: Tick,
	required_tranches: RequiredTranches,
) -> Option<Action> {
	let maybe_action = match required_tranches {
		_ if approval_entry.is_approved() => None,
		RequiredTranches::All => None,
		RequiredTranches::Exact { next_no_show, .. } => next_no_show.map(|tick| Action::ScheduleWakeup {
			block_hash,
			candidate_hash,
			tick,
		}),
		RequiredTranches::Pending { considered, next_no_show, clock_drift, .. } => {
			// select the minimum of `next_no_show`, or the tick of the next non-empty tranche
			// after `considered`, including any tranche that might contain our own untriggered
			// assignment.
			let next_non_empty_tranche = {
				let next_announced = approval_entry.tranches().iter()
					.skip_while(|t| t.tranche() <= considered)
					.map(|t| t.tranche())
					.next();

				let our_untriggered = approval_entry
					.our_assignment()
					.and_then(|t| if !t.triggered() && t.tranche() > considered {
						Some(t.tranche())
					} else {
						None
					});

				// Apply the clock drift to these tranches.
				min_prefer_some(next_announced, our_untriggered)
					.map(|t| t as Tick + block_tick + clock_drift)
			};

			min_prefer_some(next_non_empty_tranche, next_no_show)
				.map(|tick| Action::ScheduleWakeup { block_hash, candidate_hash, tick })
		}
		Some(Action::ScheduleWakeup { ref tick, .. }) => tracing::trace!(
			target: LOG_TARGET,
			tick,
			?candidate_hash,
			?block_hash,
			"Scheduling next wakeup.",
		None => tracing::trace!(
			target: LOG_TARGET,
			?candidate_hash,
			?block_hash,