Newer
Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The Approval Voting Subsystem.
//!
//! This subsystem is responsible for determining candidates to do approval checks
//! on, performing those approval checks, and tracking the assignments and approvals
//! of others. It uses this information to determine when candidates and blocks have
//! been sufficiently approved to finalize.
AssignmentCheckError, AssignmentCheckResult, ApprovalCheckError, ApprovalCheckResult,
ApprovalVotingMessage, RuntimeApiMessage, RuntimeApiRequest, ChainApiMessage,
ApprovalDistributionMessage, ValidationFailed, CandidateValidationMessage,
AvailabilityRecoveryMessage,
},
errors::RecoveryError,
Subsystem, SubsystemContext, SubsystemError, SubsystemResult, SpawnedSubsystem,
FromOverseer, OverseerSignal,
};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
rolling_session_window::RollingSessionWindow,
use polkadot_primitives::v1::{
ValidatorIndex, Hash, SessionIndex, SessionInfo, CandidateHash,
CandidateReceipt, BlockNumber, PersistedValidationData,
ValidationCode, CandidateDescriptor, ValidatorPair, ValidatorSignature, ValidatorId,
CandidateIndex, GroupIndex, ApprovalVote,
use polkadot_node_primitives::{ValidationResult, PoV};
use polkadot_node_primitives::approval::{
IndirectAssignmentCert, IndirectSignedApprovalVote, DelayTranche, BlockApprovalMeta,
use polkadot_node_jaeger as jaeger;
use sc_keystore::LocalKeystore;
use sp_consensus_slots::Slot;
use sp_runtime::traits::AppVerify;
use sp_application_crypto::Pair;
use futures::future::RemoteHandle;
use futures::channel::{mpsc, oneshot};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::btree_map::Entry;
use std::sync::Arc;
use approval_checking::RequiredTranches;
use persisted_entries::{ApprovalEntry, CandidateEntry, BlockEntry};
use criteria::{AssignmentCriteria, RealAssignmentCriteria};
use time::{slot_number_to_tick, Tick, Clock, ClockExt, SystemClock};
mod approval_checking;
mod approval_db;
mod criteria;
mod import;
mod time;
mod persisted_entries;
use crate::approval_db::v1::Config as DatabaseConfig;
#[cfg(test)]
mod tests;
const APPROVAL_SESSIONS: SessionIndex = 6;
const LOG_TARGET: &str = "parachain::approval-voting";
/// Configuration for the approval voting subsystem
/// The column family in the DB where approval-voting data is stored.
pub col_data: u32,
/// The slot duration of the consensus algorithm, in milliseconds. Should be evenly
/// divisible by 500.
pub slot_duration_millis: u64,
}
// The mode of the approval voting subsystem. It should start in a `Syncing` mode when it first
// starts, and then once it's reached the head of the chain it should move into the `Active` mode.
//
// In `Active` mode, the node is an active participant in the approvals protocol. When syncing,
// the node follows the new incoming blocks and finalized number, but does not yet participate.
//
// When transitioning from `Syncing` to `Active`, the node notifies the `ApprovalDistribution`
// subsystem of all unfinalized blocks and the candidates included within them, as well as all
// votes that the local node itself has cast on candidates within those blocks.
enum Mode {
Active,
Syncing(Box<dyn SyncOracle + Send>),
}
/// The approval voting subsystem.
pub struct ApprovalVotingSubsystem {
/// LocalKeystore is needed for assignment keys, but not necessarily approval keys.
///
/// We do a lot of VRF signing and need the keys to have low latency.
keystore: Arc<LocalKeystore>,
slot_duration_millis: u64,
db: Arc<dyn KeyValueDB>,
metrics: Metrics,
}
#[derive(Clone)]
struct MetricsInner {
imported_candidates_total: prometheus::Counter<prometheus::U64>,
assignments_produced: prometheus::Histogram,
approvals_produced_total: prometheus::CounterVec<prometheus::U64>,
no_shows_total: prometheus::Counter<prometheus::U64>,
wakeups_triggered_total: prometheus::Counter<prometheus::U64>,
candidate_approval_time_ticks: prometheus::Histogram,
block_approval_time_ticks: prometheus::Histogram,
time_db_transaction: prometheus::Histogram,
time_recover_and_approve: prometheus::Histogram,
}
/// Aproval Voting metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_candidate_imported(&self) {
if let Some(metrics) = &self.0 {
metrics.imported_candidates_total.inc();
}
}
fn on_assignment_produced(&self, tranche: DelayTranche) {
metrics.assignments_produced.observe(tranche as f64);
}
}
fn on_approval_stale(&self) {
if let Some(metrics) = &self.0 {
metrics.approvals_produced_total.with_label_values(&["stale"]).inc()
}
}
fn on_approval_invalid(&self) {
if let Some(metrics) = &self.0 {
metrics.approvals_produced_total.with_label_values(&["invalid"]).inc()
}
}
fn on_approval_unavailable(&self) {
if let Some(metrics) = &self.0 {
metrics.approvals_produced_total.with_label_values(&["unavailable"]).inc()
}
}
fn on_approval_error(&self) {
if let Some(metrics) = &self.0 {
metrics.approvals_produced_total.with_label_values(&["internal error"]).inc()
}
}
fn on_approval_produced(&self) {
if let Some(metrics) = &self.0 {
metrics.approvals_produced_total.with_label_values(&["success"]).inc()
fn on_no_shows(&self, n: usize) {
if let Some(metrics) = &self.0 {
metrics.no_shows_total.inc_by(n as u64);
}
}
fn on_wakeup(&self) {
if let Some(metrics) = &self.0 {
metrics.wakeups_triggered_total.inc();
}
}
fn on_candidate_approved(&self, ticks: Tick) {
if let Some(metrics) = &self.0 {
metrics.candidate_approval_time_ticks.observe(ticks as f64);
}
}
fn on_block_approved(&self, ticks: Tick) {
if let Some(metrics) = &self.0 {
metrics.block_approval_time_ticks.observe(ticks as f64);
}
}
fn time_db_transaction(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.time_db_transaction.start_timer())
}
fn time_recover_and_approve(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.time_recover_and_approve.start_timer())
}
}
impl metrics::Metrics for Metrics {
fn try_register(
registry: &prometheus::Registry,
) -> std::result::Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
imported_candidates_total: prometheus::register(
prometheus::Counter::new(
"parachain_imported_candidates_total",
"Number of candidates imported by the approval voting subsystem",
)?,
registry,
)?,
assignments_produced: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_assignments_produced",
"Assignments and tranches produced by the approval voting subsystem",
).buckets(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 15.0, 25.0, 40.0, 70.0]),
)?,
registry,
)?,
approvals_produced_total: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_approvals_produced_total",
"Number of approvals produced by the approval voting subsystem",
),
&["status"]
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
no_shows_total: prometheus::register(
prometheus::Counter::new(
"parachain_approvals_no_shows_total",
"Number of assignments which became no-shows in the approval voting subsystem",
)?,
registry,
)?,
wakeups_triggered_total: prometheus::register(
prometheus::Counter::new(
"parachain_approvals_wakeups_total",
"Number of times we woke up to process a candidate in the approval voting subsystem",
)?,
registry,
)?,
candidate_approval_time_ticks: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_approvals_candidate_approval_time_ticks",
"Number of ticks (500ms) to approve candidates.",
).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
)?,
registry,
)?,
block_approval_time_ticks: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_approvals_blockapproval_time_ticks",
"Number of ticks (500ms) to approve blocks.",
).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
)?,
registry,
)?,
time_db_transaction: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_time_approval_db_transaction",
"Time spent writing an approval db transaction.",
)
)?,
registry,
)?,
time_recover_and_approve: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_time_recover_and_approve",
"Time spent recovering and approving data in approval voting",
)
)?,
registry,
)?,
impl ApprovalVotingSubsystem {
/// Create a new approval voting subsystem with the given keystore, config, and database.
pub fn with_config(
config: Config,
keystore: Arc<LocalKeystore>,
sync_oracle: Box<dyn SyncOracle + Send>,
) -> Self {
ApprovalVotingSubsystem {
keystore,
slot_duration_millis: config.slot_duration_millis,
db,
db_config: DatabaseConfig {
col_data: config.col_data,
},
mode: Mode::Syncing(sync_oracle),
}
}
impl<C> Subsystem<C> for ApprovalVotingSubsystem
where C: SubsystemContext<Message = ApprovalVotingMessage>
{
fn start(self, ctx: C) -> SpawnedSubsystem {
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
ctx,
self,
Box::new(SystemClock),
Box::new(RealAssignmentCriteria),
)
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
.boxed();
SpawnedSubsystem {
name: "approval-voting-subsystem",
future,
}
}
}
enum BackgroundRequest {
ApprovalVote(ApprovalVoteRequest),
CandidateValidation(
PersistedValidationData,
ValidationCode,
CandidateDescriptor,
Arc<PoV>,
oneshot::Sender<Result<ValidationResult, ValidationFailed>>,
),
}
struct ApprovalVoteRequest {
validator_index: ValidatorIndex,
block_hash: Hash,
candidate_index: usize,
}
#[derive(Default)]
struct Wakeups {
// Tick -> [(Relay Block, Candidate Hash)]
wakeups: BTreeMap<Tick, Vec<(Hash, CandidateHash)>>,
reverse_wakeups: HashMap<(Hash, CandidateHash), Tick>,
block_numbers: BTreeMap<BlockNumber, HashSet<Hash>>,
}
impl Wakeups {
// Returns the first tick there exist wakeups for, if any.
fn first(&self) -> Option<Tick> {
self.wakeups.keys().next().map(|t| *t)
}
fn note_block(&mut self, block_hash: Hash, block_number: BlockNumber) {
self.block_numbers.entry(block_number).or_default().insert(block_hash);
}
// Schedules a wakeup at the given tick. no-op if there is already an earlier or equal wake-up
// for these values. replaces any later wakeup.
fn schedule(
&mut self,
block_hash: Hash,
block_number: BlockNumber,
candidate_hash: CandidateHash,
tick: Tick,
) {
if let Some(prev) = self.reverse_wakeups.get(&(block_hash, candidate_hash)) {
if prev <= &tick { return }
// we are replacing previous wakeup with an earlier one.
if let Entry::Occupied(mut entry) = self.wakeups.entry(*prev) {
if let Some(pos) = entry.get().iter()
.position(|x| x == &(block_hash, candidate_hash))
{
entry.get_mut().remove(pos);
}
if entry.get().is_empty() {
let _ = entry.remove_entry();
}
}
} else {
self.note_block(block_hash, block_number);
}
self.reverse_wakeups.insert((block_hash, candidate_hash), tick);
self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash));
}
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
fn prune_finalized_wakeups(&mut self, finalized_number: BlockNumber) {
let after = self.block_numbers.split_off(&(finalized_number + 1));
let pruned_blocks: HashSet<_> = std::mem::replace(&mut self.block_numbers, after)
.into_iter()
.flat_map(|(_number, hashes)| hashes)
.collect();
let mut pruned_wakeups = BTreeMap::new();
self.reverse_wakeups.retain(|&(ref h, ref c_h), tick| {
let live = !pruned_blocks.contains(h);
if !live {
pruned_wakeups.entry(*tick)
.or_insert_with(HashSet::new)
.insert((*h, *c_h));
}
live
});
for (tick, pruned) in pruned_wakeups {
if let Entry::Occupied(mut entry) = self.wakeups.entry(tick) {
entry.get_mut().retain(|wakeup| !pruned.contains(wakeup));
if entry.get().is_empty() {
let _ = entry.remove();
}
}
}
}
// Get the wakeup for a particular block/candidate combo, if any.
fn wakeup_for(&self, block_hash: Hash, candidate_hash: CandidateHash) -> Option<Tick> {
self.reverse_wakeups.get(&(block_hash, candidate_hash)).map(|t| *t)
}
// Returns the next wakeup. this future never returns if there are no wakeups.
async fn next(&mut self, clock: &(dyn Clock + Sync)) -> (Tick, Hash, CandidateHash) {
match self.first() {
None => future::pending().await,
Some(tick) => {
clock.wait(tick).await;
match self.wakeups.entry(tick) {
Entry::Vacant(_) => panic!("entry is known to exist since `first` was `Some`; qed"),
Entry::Occupied(mut entry) => {
let (hash, candidate_hash) = entry.get_mut().pop()
.expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed");
if entry.get().is_empty() {
let _ = entry.remove();
}
self.reverse_wakeups.remove(&(hash, candidate_hash));
(tick, hash, candidate_hash)
}
}
/// A read-only handle to a database.
trait DBReader {
fn load_block_entry(
&self,
block_hash: &Hash,
) -> SubsystemResult<Option<BlockEntry>>;
fn load_candidate_entry(
&self,
candidate_hash: &CandidateHash,
) -> SubsystemResult<Option<CandidateEntry>>;
fn load_all_blocks(&self) -> SubsystemResult<Vec<Hash>>;
}
// This is a submodule to enforce opacity of the inner DB type.
mod approval_db_v1_reader {
use super::{
DBReader, KeyValueDB, Hash, CandidateHash, BlockEntry, CandidateEntry,
SubsystemResult, SubsystemError, DatabaseConfig, approval_db,
};
/// A DB reader that uses the approval-db V1 under the hood.
pub(super) struct ApprovalDBV1Reader<T> {
inner: T,
config: DatabaseConfig,
}
impl<T> ApprovalDBV1Reader<T> {
pub(super) fn new(inner: T, config: DatabaseConfig) -> Self {
ApprovalDBV1Reader {
inner,
config,
}
impl<'a, T: 'a> DBReader for ApprovalDBV1Reader<T>
where T: std::ops::Deref<Target=(dyn KeyValueDB + 'a)>
{
fn load_block_entry(
&self,
block_hash: &Hash,
) -> SubsystemResult<Option<BlockEntry>> {
approval_db::v1::load_block_entry(&*self.inner, &self.config, block_hash)
.map(|e| e.map(Into::into))
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
fn load_candidate_entry(
&self,
candidate_hash: &CandidateHash,
) -> SubsystemResult<Option<CandidateEntry>> {
approval_db::v1::load_candidate_entry(&*self.inner, &self.config, candidate_hash)
.map(|e| e.map(Into::into))
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
fn load_all_blocks(&self) -> SubsystemResult<Vec<Hash>> {
approval_db::v1::load_all_blocks(&*self.inner, &self.config)
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
}
}
use approval_db_v1_reader::ApprovalDBV1Reader;
struct ApprovalStatus {
required_tranches: RequiredTranches,
tranche_now: DelayTranche,
block_tick: Tick,
}
session_window: RollingSessionWindow,
keystore: Arc<LocalKeystore>,
slot_duration_millis: u64,
db: T,
clock: Box<dyn Clock + Send + Sync>,
assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
}
impl<T> State<T> {
fn session_info(&self, i: SessionIndex) -> Option<&SessionInfo> {
self.session_window.session_info(i)
}
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
// Compute the required tranches for approval for this block and candidate combo.
// Fails if there is no approval entry for the block under the candidate or no candidate entry
// under the block, or if the session is out of bounds.
fn approval_status<'a, 'b>(
&'a self,
block_entry: &'a BlockEntry,
candidate_entry: &'b CandidateEntry,
) -> Option<(&'b ApprovalEntry, ApprovalStatus)> {
let session_info = match self.session_info(block_entry.session()) {
Some(s) => s,
None => {
tracing::warn!(target: LOG_TARGET, "Unknown session info for {}", block_entry.session());
return None;
}
};
let block_hash = block_entry.block_hash();
let tranche_now = self.clock.tranche_now(self.slot_duration_millis, block_entry.slot());
let block_tick = slot_number_to_tick(self.slot_duration_millis, block_entry.slot());
let no_show_duration = slot_number_to_tick(
self.slot_duration_millis,
Slot::from(u64::from(session_info.no_show_slots)),
);
if let Some(approval_entry) = candidate_entry.approval_entry(&block_hash) {
let required_tranches = approval_checking::tranches_to_approve(
approval_entry,
candidate_entry.approvals(),
tranche_now,
block_tick,
no_show_duration,
session_info.needed_approvals as _
);
let status = ApprovalStatus {
required_tranches,
block_tick,
tranche_now,
};
Some((approval_entry, status))
} else {
None
}
}
}
#[derive(Debug)]
enum Action {
ScheduleWakeup {
block_hash: Hash,
candidate_hash: CandidateHash,
tick: Tick,
},
WriteBlockEntry(BlockEntry),
WriteCandidateEntry(CandidateHash, CandidateEntry),
LaunchApproval {
indirect_cert: IndirectAssignmentCert,
assignment_tranche: DelayTranche,
relay_block_number: BlockNumber,
candidate_index: CandidateIndex,
session: SessionIndex,
candidate: CandidateReceipt,
asynchronous rob
committed
backing_group: GroupIndex,
type BackgroundTaskMap = BTreeMap<BlockNumber, Vec<RemoteHandle<()>>>;
mut subsystem: ApprovalVotingSubsystem,
clock: Box<dyn Clock + Send + Sync>,
assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
) -> SubsystemResult<()>
where C: SubsystemContext<Message = ApprovalVotingMessage>
{
let (background_tx, background_rx) = mpsc::channel::<BackgroundRequest>(64);
let mut state = State {
session_window: RollingSessionWindow::new(APPROVAL_SESSIONS),
keystore: subsystem.keystore,
slot_duration_millis: subsystem.slot_duration_millis,
db: ApprovalDBV1Reader::new(subsystem.db.clone(), subsystem.db_config.clone()),
clock,
assignment_criteria,
};
let mut wakeups = Wakeups::default();
// map block numbers to background work.
let mut background_tasks = BTreeMap::new();
let mut last_finalized_height: Option<BlockNumber> = None;
let mut background_rx = background_rx.fuse();
let db_writer = &*subsystem.db;
loop {
let actions = futures::select! {
(tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => {
subsystem.metrics.on_wakeup();
process_wakeup(
&mut state,
woken_block,
woken_candidate,
}
next_msg = ctx.recv().fuse() => {
let mut actions = handle_from_overseer(
&mut ctx,
&mut state,
next_msg?,
&mut last_finalized_height,
).await?;
if let Some(finalized_height) = last_finalized_height {
cleanup_background_tasks(finalized_height, &mut background_tasks);
}
if let Mode::Syncing(ref mut oracle) = subsystem.mode {
if !oracle.is_major_syncing() {
// note that we're active before processing other actions.
actions.insert(0, Action::BecomeActive)
}
}
actions
}
background_request = background_rx.next().fuse() => {
if let Some(req) = background_request {
handle_background_request(
&mut ctx,
&mut state,
req,
).await?
} else {
Vec::new()
}
}
};
if handle_actions(
&mut ctx,
&mut wakeups,
db_writer,
&mut background_tasks,
actions,
).await? {
break;
}
}
Ok(())
}
// returns `true` if any of the actions was a `Conclude` command.
async fn handle_actions(
ctx: &mut impl SubsystemContext,
background_tx: &mpsc::Sender<BackgroundRequest>,
background_tasks: &mut BackgroundTaskMap,
actions: impl IntoIterator<Item = Action>,
) -> SubsystemResult<bool> {
let mut transaction = approval_db::v1::Transaction::new(db_config);
let mut conclude = false;
for action in actions {
match action {
Action::ScheduleWakeup {
block_hash,
candidate_hash,
tick,
} => {
wakeups.schedule(block_hash, block_number, candidate_hash, tick)
}
Action::WriteBlockEntry(block_entry) => {
transaction.put_block_entry(block_entry.into());
}
Action::WriteCandidateEntry(candidate_hash, candidate_entry) => {
transaction.put_candidate_entry(candidate_hash, candidate_entry.into());
}
Action::LaunchApproval {
indirect_cert,
relay_block_number,
candidate_index,
session,
candidate,
asynchronous rob
committed
backing_group,
// Don't launch approval work if the node is syncing.
if let Mode::Syncing(_) = *mode { continue }
metrics.on_assignment_produced(assignment_tranche);
let block_hash = indirect_cert.block_hash;
let validator_index = indirect_cert.validator;
ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment(
indirect_cert,
candidate_index,
).into());
let handle = launch_approval(
background_tx.clone(),
session,
&candidate,
validator_index,
block_hash,
candidate_index as _,
asynchronous rob
committed
backing_group,
).await?;
if let Some(handle) = handle {
background_tasks.entry(relay_block_number).or_default().push(handle);
}
Action::BecomeActive => {
*mode = Mode::Active;
let messages = distribution_messages_for_activation(
ApprovalDBV1Reader::new(db, db_config)
)?;
ctx.send_messages(messages.into_iter().map(Into::into)).await;
}
Action::Conclude => { conclude = true; }
}
}
if !transaction.is_empty() {
let _timer = metrics.time_db_transaction();
transaction.write(db)
.map_err(|e| SubsystemError::with_origin("approval-voting", e))?;
}
Ok(conclude)
}
// Clean up all background tasks which are no longer needed as they correspond to a
// finalized block.
fn cleanup_background_tasks(
current_finalized_block: BlockNumber,
tasks: &mut BackgroundTaskMap,
) {
let after = tasks.split_off(&(current_finalized_block + 1));
*tasks = after;
// tasks up to the finalized block are dropped, and `RemoteHandle` cancels
// the task on drop.
}
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
fn distribution_messages_for_activation<'a>(
db: impl DBReader + 'a,
) -> SubsystemResult<Vec<ApprovalDistributionMessage>> {
let all_blocks = db.load_all_blocks()?;
let mut approval_meta = Vec::with_capacity(all_blocks.len());
let mut messages = Vec::new();
messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); // dummy value.
for block_hash in all_blocks {
let block_entry = match db.load_block_entry(&block_hash)? {
Some(b) => b,
None => {
tracing::warn!(
target: LOG_TARGET,
?block_hash,
"Missing block entry",
);
continue
}
};
approval_meta.push(BlockApprovalMeta {
hash: block_hash,
number: block_entry.block_number(),
parent_hash: block_entry.parent_hash(),
candidates: block_entry.candidates().iter().map(|(_, c_hash)| *c_hash).collect(),
slot: block_entry.slot(),
});
for (i, (_, candidate_hash)) in block_entry.candidates().iter().enumerate() {
let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
Some(c) => c,
None => {
tracing::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
"Missing candidate entry",
);
continue
}
};
match candidate_entry.approval_entry(&block_hash) {
Some(approval_entry) => {
match approval_entry.local_statements() {
(None, None) | (None, Some(_)) => {}, // second is impossible case.
(Some(assignment), None) => {
messages.push(ApprovalDistributionMessage::DistributeAssignment(
IndirectAssignmentCert {
block_hash,
validator: assignment.validator_index(),
cert: assignment.cert().clone(),
},
i as _,
));
}
(Some(assignment), Some(approval_sig)) => {
messages.push(ApprovalDistributionMessage::DistributeAssignment(
IndirectAssignmentCert {
block_hash,
validator: assignment.validator_index(),
cert: assignment.cert().clone(),
},
i as _,
));
messages.push(ApprovalDistributionMessage::DistributeApproval(
IndirectSignedApprovalVote {
block_hash,
candidate_index: i as _,
validator: assignment.validator_index(),
signature: approval_sig,
}
))
}
}
}
None => {
tracing::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
"Missing approval entry",
);
}
}
}
}
messages[0] = ApprovalDistributionMessage::NewBlocks(approval_meta);
Ok(messages)
}
// Handle an incoming signal from the overseer. Returns true if execution should conclude.
async fn handle_from_overseer(
ctx: &mut impl SubsystemContext,
state: &mut State<impl DBReader>,
db_writer: &dyn KeyValueDB,
x: FromOverseer<ApprovalVotingMessage>,
last_finalized_height: &mut Option<BlockNumber>,
) -> SubsystemResult<Vec<Action>> {
let actions = match x {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
let mut actions = Vec::new();
for activated in update.activated {
let head = activated.hash;
match import::handle_new_head(
ctx,
state,
db_writer,
head,
&*last_finalized_height,
).await {
Err(e) => return Err(SubsystemError::with_origin("db", e)),
Ok(block_imported_candidates) => {
// Schedule wakeups for all imported candidates.
for block_batch in block_imported_candidates {
tracing::debug!(
target: LOG_TARGET,
block_hash = ?block_batch.block_hash,
num_candidates = block_batch.imported_candidates.len(),
"Imported new block.",
for (c_hash, c_entry) in block_batch.imported_candidates {
let our_tranche = c_entry
.approval_entry(&block_batch.block_hash)
.and_then(|a| a.our_assignment().map(|a| a.tranche()));
if let Some(our_tranche) = our_tranche {
let tick = our_tranche as Tick + block_batch.block_tick;
tracing::trace!(
target: LOG_TARGET,
tranche = our_tranche,
candidate_hash = ?c_hash,
block_hash = ?block_batch.block_hash,
block_tick = block_batch.block_tick,
"Scheduling first wakeup.",
// Our first wakeup will just be the tranche of our assignment,
// if any. This will likely be superseded by incoming assignments
// and approvals which trigger rescheduling.
actions.push(Action::ScheduleWakeup {
block_hash: block_batch.block_hash,
block_number: block_batch.block_number,