Newer
Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The Approval Voting Subsystem.
//!
//! This subsystem is responsible for determining candidates to do approval checks
//! on, performing those approval checks, and tracking the assignments and approvals
//! of others. It uses this information to determine when candidates and blocks have
//! been sufficiently approved to finalize.
AssignmentCheckError, AssignmentCheckResult, ApprovalCheckError, ApprovalCheckResult,
ApprovalVotingMessage, RuntimeApiMessage, RuntimeApiRequest, ChainApiMessage,
ApprovalDistributionMessage, ValidationFailed, CandidateValidationMessage,
AvailabilityRecoveryMessage,
},
errors::RecoveryError,
Subsystem, SubsystemContext, SubsystemError, SubsystemResult, SpawnedSubsystem,
FromOverseer, OverseerSignal,
};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
rolling_session_window::RollingSessionWindow,
use polkadot_primitives::v1::{
ValidatorIndex, Hash, SessionIndex, SessionInfo, CandidateHash,
CandidateReceipt, BlockNumber, PersistedValidationData,
ValidationCode, CandidateDescriptor, ValidatorPair, ValidatorSignature, ValidatorId,
CandidateIndex, GroupIndex, ApprovalVote,
use polkadot_node_primitives::{ValidationResult, PoV};
use polkadot_node_primitives::approval::{
IndirectAssignmentCert, IndirectSignedApprovalVote, DelayTranche, BlockApprovalMeta,
use polkadot_node_jaeger as jaeger;
use sc_keystore::LocalKeystore;
use sp_consensus_slots::Slot;
use sp_runtime::traits::AppVerify;
use sp_application_crypto::Pair;
use futures::future::RemoteHandle;
use futures::channel::{mpsc, oneshot};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::btree_map::Entry;
use std::sync::Arc;
use approval_checking::RequiredTranches;
use persisted_entries::{ApprovalEntry, CandidateEntry, BlockEntry};
use criteria::{AssignmentCriteria, RealAssignmentCriteria};
use time::{slot_number_to_tick, Tick, Clock, ClockExt, SystemClock};
mod approval_checking;
mod approval_db;
mod criteria;
mod import;
mod time;
mod persisted_entries;
use crate::approval_db::v1::Config as DatabaseConfig;
#[cfg(test)]
mod tests;
const APPROVAL_SESSIONS: SessionIndex = 6;
const LOG_TARGET: &str = "parachain::approval-voting";
/// Configuration for the approval voting subsystem
/// The column family in the DB where approval-voting data is stored.
pub col_data: u32,
/// The slot duration of the consensus algorithm, in milliseconds. Should be evenly
/// divisible by 500.
pub slot_duration_millis: u64,
}
// The mode of the approval voting subsystem. It should start in a `Syncing` mode when it first
// starts, and then once it's reached the head of the chain it should move into the `Active` mode.
//
// In `Active` mode, the node is an active participant in the approvals protocol. When syncing,
// the node follows the new incoming blocks and finalized number, but does not yet participate.
//
// When transitioning from `Syncing` to `Active`, the node notifies the `ApprovalDistribution`
// subsystem of all unfinalized blocks and the candidates included within them, as well as all
// votes that the local node itself has cast on candidates within those blocks.
enum Mode {
Active,
Syncing(Box<dyn SyncOracle + Send>),
}
/// The approval voting subsystem.
pub struct ApprovalVotingSubsystem {
/// LocalKeystore is needed for assignment keys, but not necessarily approval keys.
///
/// We do a lot of VRF signing and need the keys to have low latency.
keystore: Arc<LocalKeystore>,
slot_duration_millis: u64,
db: Arc<dyn KeyValueDB>,
metrics: Metrics,
}
#[derive(Clone)]
struct MetricsInner {
imported_candidates_total: prometheus::Counter<prometheus::U64>,
assignments_produced_total: prometheus::Counter<prometheus::U64>,
approvals_produced_total: prometheus::Counter<prometheus::U64>,
no_shows_total: prometheus::Counter<prometheus::U64>,
wakeups_triggered_total: prometheus::Counter<prometheus::U64>,
candidate_approval_time_ticks: prometheus::Histogram,
block_approval_time_ticks: prometheus::Histogram,
time_db_transaction: prometheus::Histogram,
time_recover_and_approve: prometheus::Histogram,
}
/// Aproval Voting metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_candidate_imported(&self) {
if let Some(metrics) = &self.0 {
metrics.imported_candidates_total.inc();
}
}
fn on_assignment_produced(&self) {
if let Some(metrics) = &self.0 {
metrics.assignments_produced_total.inc();
}
}
fn on_approval_produced(&self) {
if let Some(metrics) = &self.0 {
metrics.approvals_produced_total.inc();
}
}
fn on_no_shows(&self, n: usize) {
if let Some(metrics) = &self.0 {
metrics.no_shows_total.inc_by(n as u64);
}
}
fn on_wakeup(&self) {
if let Some(metrics) = &self.0 {
metrics.wakeups_triggered_total.inc();
}
}
fn on_candidate_approved(&self, ticks: Tick) {
if let Some(metrics) = &self.0 {
metrics.candidate_approval_time_ticks.observe(ticks as f64);
}
}
fn on_block_approved(&self, ticks: Tick) {
if let Some(metrics) = &self.0 {
metrics.block_approval_time_ticks.observe(ticks as f64);
}
}
fn time_db_transaction(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.time_db_transaction.start_timer())
}
fn time_recover_and_approve(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.time_recover_and_approve.start_timer())
}
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
}
impl metrics::Metrics for Metrics {
fn try_register(
registry: &prometheus::Registry,
) -> std::result::Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
imported_candidates_total: prometheus::register(
prometheus::Counter::new(
"parachain_imported_candidates_total",
"Number of candidates imported by the approval voting subsystem",
)?,
registry,
)?,
assignments_produced_total: prometheus::register(
prometheus::Counter::new(
"parachain_assignments_produced_total",
"Number of assignments produced by the approval voting subsystem",
)?,
registry,
)?,
approvals_produced_total: prometheus::register(
prometheus::Counter::new(
"parachain_approvals_produced_total",
"Number of approvals produced by the approval voting subsystem",
)?,
registry,
)?,
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
no_shows_total: prometheus::register(
prometheus::Counter::new(
"parachain_approvals_no_shows_total",
"Number of assignments which became no-shows in the approval voting subsystem",
)?,
registry,
)?,
wakeups_triggered_total: prometheus::register(
prometheus::Counter::new(
"parachain_approvals_wakeups_total",
"Number of times we woke up to process a candidate in the approval voting subsystem",
)?,
registry,
)?,
candidate_approval_time_ticks: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_approvals_candidate_approval_time_ticks",
"Number of ticks (500ms) to approve candidates.",
).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
)?,
registry,
)?,
block_approval_time_ticks: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_approvals_blockapproval_time_ticks",
"Number of ticks (500ms) to approve blocks.",
).buckets(vec![6.0, 12.0, 18.0, 24.0, 30.0, 36.0, 72.0, 100.0, 144.0]),
)?,
registry,
)?,
time_db_transaction: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_time_approval_db_transaction",
"Time spent writing an approval db transaction.",
)
)?,
registry,
)?,
time_recover_and_approve: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_time_recover_and_approve",
"Time spent recovering and approving data in approval voting",
)
)?,
registry,
)?,
impl ApprovalVotingSubsystem {
/// Create a new approval voting subsystem with the given keystore, config, and database.
pub fn with_config(
config: Config,
keystore: Arc<LocalKeystore>,
sync_oracle: Box<dyn SyncOracle + Send>,
) -> Self {
ApprovalVotingSubsystem {
keystore,
slot_duration_millis: config.slot_duration_millis,
db,
db_config: DatabaseConfig {
col_data: config.col_data,
},
mode: Mode::Syncing(sync_oracle),
}
}
impl<C> Subsystem<C> for ApprovalVotingSubsystem
where C: SubsystemContext<Message = ApprovalVotingMessage>
{
fn start(self, ctx: C) -> SpawnedSubsystem {
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
ctx,
self,
Box::new(SystemClock),
Box::new(RealAssignmentCriteria),
)
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
.boxed();
SpawnedSubsystem {
name: "approval-voting-subsystem",
future,
}
}
}
enum BackgroundRequest {
ApprovalVote(ApprovalVoteRequest),
CandidateValidation(
PersistedValidationData,
ValidationCode,
CandidateDescriptor,
Arc<PoV>,
oneshot::Sender<Result<ValidationResult, ValidationFailed>>,
),
}
struct ApprovalVoteRequest {
validator_index: ValidatorIndex,
block_hash: Hash,
candidate_index: usize,
}
#[derive(Default)]
struct Wakeups {
// Tick -> [(Relay Block, Candidate Hash)]
wakeups: BTreeMap<Tick, Vec<(Hash, CandidateHash)>>,
reverse_wakeups: HashMap<(Hash, CandidateHash), Tick>,
block_numbers: BTreeMap<BlockNumber, HashSet<Hash>>,
}
impl Wakeups {
// Returns the first tick there exist wakeups for, if any.
fn first(&self) -> Option<Tick> {
self.wakeups.keys().next().map(|t| *t)
}
fn note_block(&mut self, block_hash: Hash, block_number: BlockNumber) {
self.block_numbers.entry(block_number).or_default().insert(block_hash);
}
// Schedules a wakeup at the given tick. no-op if there is already an earlier or equal wake-up
// for these values. replaces any later wakeup.
fn schedule(
&mut self,
block_hash: Hash,
block_number: BlockNumber,
candidate_hash: CandidateHash,
tick: Tick,
) {
if let Some(prev) = self.reverse_wakeups.get(&(block_hash, candidate_hash)) {
if prev <= &tick { return }
// we are replacing previous wakeup with an earlier one.
if let Entry::Occupied(mut entry) = self.wakeups.entry(*prev) {
if let Some(pos) = entry.get().iter()
.position(|x| x == &(block_hash, candidate_hash))
{
entry.get_mut().remove(pos);
}
if entry.get().is_empty() {
let _ = entry.remove_entry();
}
}
} else {
self.note_block(block_hash, block_number);
}
self.reverse_wakeups.insert((block_hash, candidate_hash), tick);
self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash));
}
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
fn prune_finalized_wakeups(&mut self, finalized_number: BlockNumber) {
let after = self.block_numbers.split_off(&(finalized_number + 1));
let pruned_blocks: HashSet<_> = std::mem::replace(&mut self.block_numbers, after)
.into_iter()
.flat_map(|(_number, hashes)| hashes)
.collect();
let mut pruned_wakeups = BTreeMap::new();
self.reverse_wakeups.retain(|&(ref h, ref c_h), tick| {
let live = !pruned_blocks.contains(h);
if !live {
pruned_wakeups.entry(*tick)
.or_insert_with(HashSet::new)
.insert((*h, *c_h));
}
live
});
for (tick, pruned) in pruned_wakeups {
if let Entry::Occupied(mut entry) = self.wakeups.entry(tick) {
entry.get_mut().retain(|wakeup| !pruned.contains(wakeup));
if entry.get().is_empty() {
let _ = entry.remove();
}
}
}
}
// Get the wakeup for a particular block/candidate combo, if any.
fn wakeup_for(&self, block_hash: Hash, candidate_hash: CandidateHash) -> Option<Tick> {
self.reverse_wakeups.get(&(block_hash, candidate_hash)).map(|t| *t)
}
// Returns the next wakeup. this future never returns if there are no wakeups.
async fn next(&mut self, clock: &(dyn Clock + Sync)) -> (Tick, Hash, CandidateHash) {
match self.first() {
None => future::pending().await,
Some(tick) => {
clock.wait(tick).await;
match self.wakeups.entry(tick) {
Entry::Vacant(_) => panic!("entry is known to exist since `first` was `Some`; qed"),
Entry::Occupied(mut entry) => {
let (hash, candidate_hash) = entry.get_mut().pop()
.expect("empty entries are removed here and in `schedule`; no other mutation of this map; qed");
if entry.get().is_empty() {
let _ = entry.remove();
}
self.reverse_wakeups.remove(&(hash, candidate_hash));
(tick, hash, candidate_hash)
}
}
/// A read-only handle to a database.
trait DBReader {
fn load_block_entry(
&self,
block_hash: &Hash,
) -> SubsystemResult<Option<BlockEntry>>;
fn load_candidate_entry(
&self,
candidate_hash: &CandidateHash,
) -> SubsystemResult<Option<CandidateEntry>>;
fn load_all_blocks(&self) -> SubsystemResult<Vec<Hash>>;
}
// This is a submodule to enforce opacity of the inner DB type.
mod approval_db_v1_reader {
use super::{
DBReader, KeyValueDB, Hash, CandidateHash, BlockEntry, CandidateEntry,
SubsystemResult, SubsystemError, DatabaseConfig, approval_db,
};
/// A DB reader that uses the approval-db V1 under the hood.
pub(super) struct ApprovalDBV1Reader<T> {
inner: T,
config: DatabaseConfig,
}
impl<T> ApprovalDBV1Reader<T> {
pub(super) fn new(inner: T, config: DatabaseConfig) -> Self {
ApprovalDBV1Reader {
inner,
config,
}
impl<'a, T: 'a> DBReader for ApprovalDBV1Reader<T>
where T: std::ops::Deref<Target=(dyn KeyValueDB + 'a)>
{
fn load_block_entry(
&self,
block_hash: &Hash,
) -> SubsystemResult<Option<BlockEntry>> {
approval_db::v1::load_block_entry(&*self.inner, &self.config, block_hash)
.map(|e| e.map(Into::into))
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
fn load_candidate_entry(
&self,
candidate_hash: &CandidateHash,
) -> SubsystemResult<Option<CandidateEntry>> {
approval_db::v1::load_candidate_entry(&*self.inner, &self.config, candidate_hash)
.map(|e| e.map(Into::into))
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
fn load_all_blocks(&self) -> SubsystemResult<Vec<Hash>> {
approval_db::v1::load_all_blocks(&*self.inner, &self.config)
.map_err(|e| SubsystemError::with_origin("approval-voting", e))
}
}
}
use approval_db_v1_reader::ApprovalDBV1Reader;
struct ApprovalStatus {
required_tranches: RequiredTranches,
tranche_now: DelayTranche,
block_tick: Tick,
}
session_window: RollingSessionWindow,
keystore: Arc<LocalKeystore>,
slot_duration_millis: u64,
db: T,
clock: Box<dyn Clock + Send + Sync>,
assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
}
impl<T> State<T> {
fn session_info(&self, i: SessionIndex) -> Option<&SessionInfo> {
self.session_window.session_info(i)
}
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
// Compute the required tranches for approval for this block and candidate combo.
// Fails if there is no approval entry for the block under the candidate or no candidate entry
// under the block, or if the session is out of bounds.
fn approval_status<'a, 'b>(
&'a self,
block_entry: &'a BlockEntry,
candidate_entry: &'b CandidateEntry,
) -> Option<(&'b ApprovalEntry, ApprovalStatus)> {
let session_info = match self.session_info(block_entry.session()) {
Some(s) => s,
None => {
tracing::warn!(target: LOG_TARGET, "Unknown session info for {}", block_entry.session());
return None;
}
};
let block_hash = block_entry.block_hash();
let tranche_now = self.clock.tranche_now(self.slot_duration_millis, block_entry.slot());
let block_tick = slot_number_to_tick(self.slot_duration_millis, block_entry.slot());
let no_show_duration = slot_number_to_tick(
self.slot_duration_millis,
Slot::from(u64::from(session_info.no_show_slots)),
);
if let Some(approval_entry) = candidate_entry.approval_entry(&block_hash) {
let required_tranches = approval_checking::tranches_to_approve(
approval_entry,
candidate_entry.approvals(),
tranche_now,
block_tick,
no_show_duration,
session_info.needed_approvals as _
);
let status = ApprovalStatus {
required_tranches,
block_tick,
tranche_now,
};
Some((approval_entry, status))
} else {
None
}
}
}
#[derive(Debug)]
enum Action {
ScheduleWakeup {
block_hash: Hash,
candidate_hash: CandidateHash,
tick: Tick,
},
WriteBlockEntry(BlockEntry),
WriteCandidateEntry(CandidateHash, CandidateEntry),
LaunchApproval {
indirect_cert: IndirectAssignmentCert,
relay_block_number: BlockNumber,
candidate_index: CandidateIndex,
session: SessionIndex,
candidate: CandidateReceipt,
asynchronous rob
committed
backing_group: GroupIndex,
type BackgroundTaskMap = BTreeMap<BlockNumber, Vec<RemoteHandle<()>>>;
mut subsystem: ApprovalVotingSubsystem,
clock: Box<dyn Clock + Send + Sync>,
assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
) -> SubsystemResult<()>
where C: SubsystemContext<Message = ApprovalVotingMessage>
{
let (background_tx, background_rx) = mpsc::channel::<BackgroundRequest>(64);
let mut state = State {
session_window: RollingSessionWindow::new(APPROVAL_SESSIONS),
keystore: subsystem.keystore,
slot_duration_millis: subsystem.slot_duration_millis,
db: ApprovalDBV1Reader::new(subsystem.db.clone(), subsystem.db_config.clone()),
clock,
assignment_criteria,
};
let mut wakeups = Wakeups::default();
// map block numbers to background work.
let mut background_tasks = BTreeMap::new();
let mut last_finalized_height: Option<BlockNumber> = None;
let mut background_rx = background_rx.fuse();
let db_writer = &*subsystem.db;
loop {
let actions = futures::select! {
(tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => {
subsystem.metrics.on_wakeup();
process_wakeup(
&mut state,
woken_block,
woken_candidate,
}
next_msg = ctx.recv().fuse() => {
let mut actions = handle_from_overseer(
&mut ctx,
&mut state,
next_msg?,
&mut last_finalized_height,
).await?;
if let Some(finalized_height) = last_finalized_height {
cleanup_background_tasks(finalized_height, &mut background_tasks);
}
if let Mode::Syncing(ref mut oracle) = subsystem.mode {
if !oracle.is_major_syncing() {
// note that we're active before processing other actions.
actions.insert(0, Action::BecomeActive)
}
}
actions
}
background_request = background_rx.next().fuse() => {
if let Some(req) = background_request {
handle_background_request(
&mut ctx,
&mut state,
req,
).await?
} else {
Vec::new()
}
}
};
if handle_actions(
&mut ctx,
&mut wakeups,
db_writer,
&mut background_tasks,
actions,
).await? {
break;
}
}
Ok(())
}
// returns `true` if any of the actions was a `Conclude` command.
async fn handle_actions(
ctx: &mut impl SubsystemContext,
background_tx: &mpsc::Sender<BackgroundRequest>,
background_tasks: &mut BackgroundTaskMap,
actions: impl IntoIterator<Item = Action>,
) -> SubsystemResult<bool> {
let mut transaction = approval_db::v1::Transaction::new(db_config);
let mut conclude = false;
for action in actions {
match action {
Action::ScheduleWakeup {
block_hash,
candidate_hash,
tick,
} => {
wakeups.schedule(block_hash, block_number, candidate_hash, tick)
}
Action::WriteBlockEntry(block_entry) => {
transaction.put_block_entry(block_entry.into());
}
Action::WriteCandidateEntry(candidate_hash, candidate_entry) => {
transaction.put_candidate_entry(candidate_hash, candidate_entry.into());
}
Action::LaunchApproval {
indirect_cert,
relay_block_number,
candidate_index,
session,
candidate,
asynchronous rob
committed
backing_group,
// Don't launch approval work if the node is syncing.
if let Mode::Syncing(_) = *mode { continue }
let block_hash = indirect_cert.block_hash;
let validator_index = indirect_cert.validator;
ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment(
indirect_cert,
candidate_index,
).into());
let handle = launch_approval(
background_tx.clone(),
session,
&candidate,
validator_index,
block_hash,
candidate_index as _,
asynchronous rob
committed
backing_group,
).await?;
if let Some(handle) = handle {
background_tasks.entry(relay_block_number).or_default().push(handle);
}
Action::BecomeActive => {
*mode = Mode::Active;
let messages = distribution_messages_for_activation(
ApprovalDBV1Reader::new(db, db_config)
)?;
ctx.send_messages(messages.into_iter().map(Into::into)).await;
}
Action::Conclude => { conclude = true; }
}
}
if !transaction.is_empty() {
let _timer = metrics.time_db_transaction();
transaction.write(db)
.map_err(|e| SubsystemError::with_origin("approval-voting", e))?;
}
Ok(conclude)
}
// Clean up all background tasks which are no longer needed as they correspond to a
// finalized block.
fn cleanup_background_tasks(
current_finalized_block: BlockNumber,
tasks: &mut BackgroundTaskMap,
) {
let after = tasks.split_off(&(current_finalized_block + 1));
*tasks = after;
// tasks up to the finalized block are dropped, and `RemoteHandle` cancels
// the task on drop.
}
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
fn distribution_messages_for_activation<'a>(
db: impl DBReader + 'a,
) -> SubsystemResult<Vec<ApprovalDistributionMessage>> {
let all_blocks = db.load_all_blocks()?;
let mut approval_meta = Vec::with_capacity(all_blocks.len());
let mut messages = Vec::new();
messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); // dummy value.
for block_hash in all_blocks {
let block_entry = match db.load_block_entry(&block_hash)? {
Some(b) => b,
None => {
tracing::warn!(
target: LOG_TARGET,
?block_hash,
"Missing block entry",
);
continue
}
};
approval_meta.push(BlockApprovalMeta {
hash: block_hash,
number: block_entry.block_number(),
parent_hash: block_entry.parent_hash(),
candidates: block_entry.candidates().iter().map(|(_, c_hash)| *c_hash).collect(),
slot: block_entry.slot(),
});
for (i, (_, candidate_hash)) in block_entry.candidates().iter().enumerate() {
let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
Some(c) => c,
None => {
tracing::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
"Missing candidate entry",
);
continue
}
};
match candidate_entry.approval_entry(&block_hash) {
Some(approval_entry) => {
match approval_entry.local_statements() {
(None, None) | (None, Some(_)) => {}, // second is impossible case.
(Some(assignment), None) => {
messages.push(ApprovalDistributionMessage::DistributeAssignment(
IndirectAssignmentCert {
block_hash,
validator: assignment.validator_index(),
cert: assignment.cert().clone(),
},
i as _,
));
}
(Some(assignment), Some(approval_sig)) => {
messages.push(ApprovalDistributionMessage::DistributeAssignment(
IndirectAssignmentCert {
block_hash,
validator: assignment.validator_index(),
cert: assignment.cert().clone(),
},
i as _,
));
messages.push(ApprovalDistributionMessage::DistributeApproval(
IndirectSignedApprovalVote {
block_hash,
candidate_index: i as _,
validator: assignment.validator_index(),
signature: approval_sig,
}
))
}
}
}
None => {
tracing::warn!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
"Missing approval entry",
);
}
}
}
}
messages[0] = ApprovalDistributionMessage::NewBlocks(approval_meta);
Ok(messages)
}
// Handle an incoming signal from the overseer. Returns true if execution should conclude.
async fn handle_from_overseer(
ctx: &mut impl SubsystemContext,
state: &mut State<impl DBReader>,
db_writer: &dyn KeyValueDB,
x: FromOverseer<ApprovalVotingMessage>,
last_finalized_height: &mut Option<BlockNumber>,
) -> SubsystemResult<Vec<Action>> {
let actions = match x {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
let mut actions = Vec::new();
for activated in update.activated {
let head = activated.hash;
match import::handle_new_head(
ctx,
state,
db_writer,
head,
&*last_finalized_height,
).await {
Err(e) => return Err(SubsystemError::with_origin("db", e)),
Ok(block_imported_candidates) => {
// Schedule wakeups for all imported candidates.
for block_batch in block_imported_candidates {
tracing::debug!(
target: LOG_TARGET,
block_hash = ?block_batch.block_hash,
num_candidates = block_batch.imported_candidates.len(),
"Imported new block.",
for (c_hash, c_entry) in block_batch.imported_candidates {
let our_tranche = c_entry
.approval_entry(&block_batch.block_hash)
.and_then(|a| a.our_assignment().map(|a| a.tranche()));
if let Some(our_tranche) = our_tranche {
let tick = our_tranche as Tick + block_batch.block_tick;
tracing::trace!(
target: LOG_TARGET,
tranche = our_tranche,
candidate_hash = ?c_hash,
block_hash = ?block_batch.block_hash,
block_tick = block_batch.block_tick,
"Scheduling first wakeup.",
// Our first wakeup will just be the tranche of our assignment,
// if any. This will likely be superseded by incoming assignments
// and approvals which trigger rescheduling.
actions.push(Action::ScheduleWakeup {
block_hash: block_batch.block_hash,
block_number: block_batch.block_number,
});
}
}
}
}
}
}
actions
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(block_hash, block_number)) => {
*last_finalized_height = Some(block_number);
approval_db::v1::canonicalize(db_writer, &db_config, block_number, block_hash)
.map_err(|e| SubsystemError::with_origin("db", e))?;
wakeups.prune_finalized_wakeups(block_number);
Vec::new()
}
FromOverseer::Signal(OverseerSignal::Conclude) => {
vec![Action::Conclude]
}
FromOverseer::Communication { msg } => match msg {
ApprovalVotingMessage::CheckAndImportAssignment(a, claimed_core, res) => {
let (check_outcome, actions)
= check_and_import_assignment(state, a, claimed_core)?;
let _ = res.send(check_outcome);
actions
}
ApprovalVotingMessage::CheckAndImportApproval(a, res) => {
check_and_import_approval(state, metrics, a, |r| { let _ = res.send(r); })?.0
}
ApprovalVotingMessage::ApprovedAncestor(target, lower_bound, res ) => {
match handle_approved_ancestor(ctx, &state.db, target, lower_bound, wakeups).await {