Newer
Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Implements a `CandidateBackingSubsystem`.
#![deny(unused_crate_dependencies)]
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::pin::Pin;
use futures::{channel::{mpsc, oneshot}, Future, FutureExt, SinkExt, StreamExt};
use sp_keystore::SyncCryptoStorePtr;
CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorId,
ValidatorIndex, SigningContext, PoV, CandidateHash,
CandidateDescriptor, AvailableData, ValidatorSignature, Hash, CandidateReceipt,
CoreState, CoreIndex, CollatorId, ValidityAttestation, CandidateCommitments,
FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult,
jaeger::{self, JaegerSpan},
messages::{
AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
CandidateValidationMessage, PoVDistributionMessage, ProvisionableData,
ProvisionerMessage, StatementDistributionMessage, ValidationFailed, RuntimeApiRequest,
Peter Goodspeed-Niklaus
committed
};
use polkadot_node_subsystem_util::{
self as util,
request_session_index_for_child,
request_validator_groups,
request_validators,
request_from_runtime,
Validator,
};
use statement_table::{
generic::AttestedCandidate as TableAttestedCandidate,
Context as TableContextTrait,
Table,
v1::{
Statement as TableStatement,
SignedStatement as TableSignedStatement, Summary as TableSummary,
},
const LOG_TARGET: &str = "candidate_backing";
#[error("Candidate is not found")]
#[error("Failed to send candidates {0:?}")]
Send(Vec<BackedCandidate>),
#[error("FetchPoV channel closed before receipt")]
FetchPoV(#[source] oneshot::Canceled),
#[error("ValidateFromChainState channel closed before receipt")]
ValidateFromChainState(#[source] oneshot::Canceled),
#[error("StoreAvailableData channel closed before receipt")]
StoreAvailableData(#[source] oneshot::Canceled),
#[error("a channel was closed before receipt in try_join!")]
JoinMultiple(#[source] oneshot::Canceled),
#[error("Obtaining erasure chunks failed")]
ObtainErasureChunks(#[from] erasure_coding::Error),
#[error(transparent)]
ValidationFailed(#[from] ValidationFailed),
#[error(transparent)]
Mpsc(#[from] mpsc::SendError),
#[error(transparent)]
UtilError(#[from] util::Error),
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
enum ValidatedCandidateCommand {
// We were instructed to second the candidate.
Second(BackgroundValidationResult),
// We were instructed to validate the candidate.
Attest(BackgroundValidationResult),
}
impl std::fmt::Debug for ValidatedCandidateCommand {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let candidate_hash = self.candidate_hash();
match *self {
ValidatedCandidateCommand::Second(_) =>
write!(f, "Second({})", candidate_hash),
ValidatedCandidateCommand::Attest(_) =>
write!(f, "Attest({})", candidate_hash),
}
}
}
impl ValidatedCandidateCommand {
fn candidate_hash(&self) -> CandidateHash {
match *self {
ValidatedCandidateCommand::Second(Ok((ref candidate, _, _))) => candidate.hash(),
ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
ValidatedCandidateCommand::Attest(Ok((ref candidate, _, _))) => candidate.hash(),
ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
}
}
}
/// Holds all data needed for candidate backing job operation.
struct CandidateBackingJob {
/// The hash of the relay parent on top of which this job is doing it's work.
parent: Hash,
/// Outbound message channel sending part.
/// The `ParaId` assigned to this validator
assignment: Option<ParaId>,
/// The collator required to author the candidate, if any.
required_collator: Option<CollatorId>,
/// Spans for all candidates that are not yet backable.
unbacked_candidates: HashMap<CandidateHash, JaegerSpan>,
/// We issued `Seconded`, `Valid` or `Invalid` statements on about these candidates.
issued_statements: HashSet<CandidateHash>,
/// These candidates are undergoing validation in the background.
awaiting_validation: HashSet<CandidateHash>,
/// `Some(h)` if this job has already issued `Seconded` statement for some candidate with `h` hash.
seconded: Option<CandidateHash>,
/// The candidates that are includable, by hash. Each entry here indicates
/// that we've sent the provisioner the backed candidate.
backed: HashSet<CandidateHash>,
/// We have already reported misbehaviors for these validators.
reported_misbehavior_for: HashSet<ValidatorIndex>,
keystore: SyncCryptoStorePtr,
table: Table<TableContext>,
table_context: TableContext,
background_validation: mpsc::Receiver<ValidatedCandidateCommand>,
background_validation_tx: mpsc::Sender<ValidatedCandidateCommand>,
}
const fn group_quorum(n_validators: usize) -> usize {
(n_validators / 2) + 1
}
#[derive(Default)]
struct TableContext {
signing_context: SigningContext,
groups: HashMap<ParaId, Vec<ValidatorIndex>>,
validators: Vec<ValidatorId>,
}
impl TableContextTrait for TableContext {
type GroupId = ParaId;
type Signature = ValidatorSignature;
type Candidate = CommittedCandidateReceipt;
fn candidate_digest(candidate: &CommittedCandidateReceipt) -> CandidateHash {
candidate.hash()
}
fn candidate_group(candidate: &CommittedCandidateReceipt) -> ParaId {
candidate.descriptor().para_id
}
fn is_member_of(&self, authority: &ValidatorIndex, group: &ParaId) -> bool {
self.groups.get(group).map_or(false, |g| g.iter().position(|a| a == authority).is_some())
}
fn requisite_votes(&self, group: &ParaId) -> usize {
self.groups.get(group).map_or(usize::max_value(), |g| group_quorum(g.len()))
}
}
asynchronous rob
committed
struct InvalidErasureRoot;
// It looks like it's not possible to do an `impl From` given the current state of
// the code. So this does the necessary conversion.
fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement {
let statement = match s.payload() {
Statement::Seconded(c) => TableStatement::Candidate(c.clone()),
Statement::Valid(h) => TableStatement::Valid(h.clone()),
Statement::Invalid(h) => TableStatement::Invalid(h.clone()),
};
TableSignedStatement {
statement,
signature: s.signature().clone(),
sender: s.validator_index(),
}
}
#[tracing::instrument(level = "trace", skip(attested, table_context), fields(subsystem = LOG_TARGET))]
fn table_attested_to_backed(
attested: TableAttestedCandidate<
ParaId,
CommittedCandidateReceipt,
ValidatorIndex,
ValidatorSignature,
>,
table_context: &TableContext,
) -> Option<BackedCandidate> {
let TableAttestedCandidate { candidate, validity_votes, group_id: para_id } = attested;
let (ids, validity_votes): (Vec<_>, Vec<ValidityAttestation>) = validity_votes
.into_iter()
.map(|(id, vote)| (id, vote.into()))
.unzip();
let group = table_context.groups.get(¶_id)?;
let mut validator_indices = BitVec::with_capacity(group.len());
validator_indices.resize(group.len(), false);
// The order of the validity votes in the backed candidate must match
// the order of bits set in the bitfield, which is not necessarily
// the order of the `validity_votes` we got from the table.
let mut vote_positions = Vec::with_capacity(validity_votes.len());
for (orig_idx, id) in ids.iter().enumerate() {
if let Some(position) = group.iter().position(|x| x == id) {
validator_indices.set(position, true);
vote_positions.push((orig_idx, position));
} else {
tracing::warn!(
target: LOG_TARGET,
"Logic error: Validity vote from table does not correspond to group",
);
return None;
vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
validity_votes: vote_positions.into_iter()
.map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
.collect(),
async fn store_available_data(
tx_from: &mut mpsc::Sender<FromJobCommand>,
id: Option<ValidatorIndex>,
n_validators: u32,
candidate_hash: CandidateHash,
available_data: AvailableData,
) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
tx_from.send(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreAvailableData(
candidate_hash,
id,
n_validators,
available_data,
tx,
)
).into()
).await?;
let _ = rx.await.map_err(Error::StoreAvailableData)?;
Ok(())
}
// Make a `PoV` available.
//
// This will compute the erasure root internally and compare it to the expected erasure root.
// This returns `Err()` iff there is an internal error. Otherwise, it returns either `Ok(Ok(()))` or `Ok(Err(_))`.
#[tracing::instrument(level = "trace", skip(tx_from, pov, span), fields(subsystem = LOG_TARGET))]
async fn make_pov_available(
tx_from: &mut mpsc::Sender<FromJobCommand>,
validator_index: Option<ValidatorIndex>,
n_validators: usize,
pov: Arc<PoV>,
candidate_hash: CandidateHash,
validation_data: polkadot_primitives::v1::PersistedValidationData,
expected_erasure_root: Hash,
span: Option<&JaegerSpan>,
) -> Result<Result<(), InvalidErasureRoot>, Error> {
let available_data = AvailableData {
pov,
validation_data,
};
{
let _span = span.as_ref().map(|s| s.child("erasure-coding"));
let chunks = erasure_coding::obtain_chunks_v1(
n_validators,
&available_data,
)?;
let branches = erasure_coding::branches(chunks.as_ref());
let erasure_root = branches.root();
if erasure_root != expected_erasure_root {
return Ok(Err(InvalidErasureRoot));
}
{
let _span = span.as_ref().map(|s| s.child("store-data"));
store_available_data(
tx_from,
validator_index,
n_validators as u32,
candidate_hash,
available_data,
).await?;
}
Ok(Ok(()))
}
async fn request_pov_from_distribution(
tx_from: &mut mpsc::Sender<FromJobCommand>,
parent: Hash,
descriptor: CandidateDescriptor,
) -> Result<Arc<PoV>, Error> {
let (tx, rx) = oneshot::channel();
tx_from.send(AllMessages::PoVDistribution(
PoVDistributionMessage::FetchPoV(parent, descriptor, tx)
).into()).await?;
rx.await.map_err(Error::FetchPoV)
}
async fn request_candidate_validation(
tx_from: &mut mpsc::Sender<FromJobCommand>,
candidate: CandidateDescriptor,
pov: Arc<PoV>,
) -> Result<ValidationResult, Error> {
let (tx, rx) = oneshot::channel();
tx_from.send(AllMessages::CandidateValidation(
CandidateValidationMessage::ValidateFromChainState(
candidate,
pov,
tx,
)
).into()
).await?;
match rx.await {
Ok(Ok(validation_result)) => Ok(validation_result),
Ok(Err(err)) => Err(Error::ValidationFailed(err)),
Err(err) => Err(Error::ValidateFromChainState(err)),
}
}
type BackgroundValidationResult = Result<(CandidateReceipt, CandidateCommitments, Arc<PoV>), CandidateReceipt>;
struct BackgroundValidationParams<F> {
tx_from: mpsc::Sender<FromJobCommand>,
tx_command: mpsc::Sender<ValidatedCandidateCommand>,
candidate: CandidateReceipt,
relay_parent: Hash,
pov: Option<Arc<PoV>>,
validator_index: Option<ValidatorIndex>,
n_validators: usize,
asynchronous rob
committed
span: Option<JaegerSpan>,
make_command: F,
}
async fn validate_and_make_available(
params: BackgroundValidationParams<impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand>,
) -> Result<(), Error> {
let BackgroundValidationParams {
mut tx_from,
mut tx_command,
candidate,
relay_parent,
pov,
validator_index,
n_validators,
make_command,
} = params;
let pov = match pov {
Some(pov) => pov,
asynchronous rob
committed
let _span = span.as_ref().map(|s| s.child("request-pov"));
request_pov_from_distribution(
&mut tx_from,
relay_parent,
candidate.descriptor.clone(),
).await?
}
asynchronous rob
committed
let _span = span.as_ref().map(|s| s.child("request-validation"));
request_candidate_validation(&mut tx_from, candidate.descriptor.clone(), pov.clone()).await?
};
let expected_commitments_hash = candidate.commitments_hash;
let res = match v {
ValidationResult::Valid(commitments, validation_data) => {
// If validation produces a new set of commitments, we vote the candidate as invalid.
if commitments.hash() != expected_commitments_hash {
tracing::trace!(
target: LOG_TARGET,
candidate_receipt = ?candidate,
actual_commitments = ?commitments,
"Commitments obtained with validation don't match the announced by the candidate receipt",
);
Err(candidate)
} else {
let erasure_valid = make_pov_available(
&mut tx_from,
validator_index,
n_validators,
pov.clone(),
candidate.hash(),
validation_data,
candidate.descriptor.erasure_root,
span.as_ref(),
).await?;
match erasure_valid {
Ok(()) => Ok((candidate, commitments, pov.clone())),
Err(InvalidErasureRoot) => {
tracing::trace!(
target: LOG_TARGET,
candidate_receipt = ?candidate,
actual_commitments = ?commitments,
"Erasure root doesn't match the announced by the candidate receipt",
);
Err(candidate)
},
ValidationResult::Invalid(reason) => {
tracing::trace!(
target: LOG_TARGET,
candidate_receipt = ?candidate,
reason = ?reason,
"Validation yielded an invalid candidate",
);
Err(candidate)
}
};
let command = make_command(res);
tx_command.send(command).await?;
Ok(())
}
impl CandidateBackingJob {
/// Run asynchronously.
async fn run_loop(
mut self,
mut rx_to: mpsc::Receiver<CandidateBackingMessage>,
) -> Result<(), Error> {
futures::select! {
validated_command = self.background_validation.next() => {
let _span = span.child("process-validation-result");
if let Some(c) = validated_command {
self.handle_validated_candidate_command(c).await?;
} else {
panic!("`self` hasn't dropped and `self` holds a reference to this sender; qed");
}
}
to_job = rx_to.next() => match to_job {
None => break,
Some(msg) => {
// we intentionally want spans created in `process_msg` to descend from the
// `span ` which is longer-lived than this ephemeral timing span.
let _timing_span = span.child("process-message");
self.process_msg(&span, msg).await?;
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn handle_validated_candidate_command(
&mut self,
command: ValidatedCandidateCommand,
) -> Result<(), Error> {
let candidate_hash = command.candidate_hash();
self.awaiting_validation.remove(&candidate_hash);
match command {
ValidatedCandidateCommand::Second(res) => {
match res {
Ok((candidate, commitments, pov)) => {
// sanity check.
if self.seconded.is_none() && !self.issued_statements.contains(&candidate_hash) {
self.seconded = Some(candidate_hash);
self.issued_statements.insert(candidate_hash);
self.metrics.on_candidate_seconded();
let statement = Statement::Seconded(CommittedCandidateReceipt {
descriptor: candidate.descriptor.clone(),
commitments,
});
self.sign_import_and_distribute_statement(statement).await?;
self.distribute_pov(candidate.descriptor, pov).await?;
}
}
Err(candidate) => {
self.issue_candidate_invalid_message(candidate).await?;
}
}
}
ValidatedCandidateCommand::Attest(res) => {
// sanity check.
if !self.issued_statements.contains(&candidate_hash) {
let statement = if res.is_ok() {
Statement::Valid(candidate_hash)
} else {
Statement::Invalid(candidate_hash)
};
self.issued_statements.insert(candidate_hash);
self.sign_import_and_distribute_statement(statement).await?;
}
}
}
Ok(())
}
#[tracing::instrument(level = "trace", skip(self, params), fields(subsystem = LOG_TARGET))]
async fn background_validate_and_make_available(
&mut self,
params: BackgroundValidationParams<
impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static
>,
) -> Result<(), Error> {
let candidate_hash = params.candidate.hash();
if self.awaiting_validation.insert(candidate_hash) {
// spawn background task.
let bg = async move {
if let Err(e) = validate_and_make_available(params).await {
tracing::error!("Failed to validate and make available: {:?}", e);
}
};
self.tx_from.send(FromJobCommand::Spawn("Backing Validation", bg.boxed())).await?;
}
Ok(())
}
async fn issue_candidate_invalid_message(
&mut self,
self.tx_from.send(AllMessages::from(CandidateSelectionMessage::Invalid(self.parent, candidate)).into()).await?;
/// Kick off background validation with intent to second.
#[tracing::instrument(level = "trace", skip(self, parent_span, pov), fields(subsystem = LOG_TARGET))]
async fn validate_and_second(
&mut self,
parent_span: &JaegerSpan,
) -> Result<(), Error> {
// Check that candidate is collated by the right collator.
if self.required_collator.as_ref()
.map_or(false, |c| c != &candidate.descriptor().collator)
{
self.issue_candidate_invalid_message(candidate.clone()).await?;
let candidate_hash = candidate.hash();
self.add_unbacked_span(&parent_span, candidate_hash);
asynchronous rob
committed
let span = self.get_unbacked_validation_child(&candidate_hash);
self.background_validate_and_make_available(BackgroundValidationParams {
tx_from: self.tx_from.clone(),
tx_command: self.background_validation_tx.clone(),
candidate: candidate.clone(),
relay_parent: self.parent,
pov: Some(pov),
validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
n_validators: self.table_context.validators.len(),
make_command: ValidatedCandidateCommand::Second,
}).await?;
asynchronous rob
committed
async fn sign_import_and_distribute_statement(&mut self, statement: Statement) -> Result<(), Error> {
if let Some(signed_statement) = self.sign_statement(statement).await {
self.import_statement(&signed_statement).await?;
self.distribute_signed_statement(signed_statement).await?;
}
Ok(())
}
/// Check if there have happened any new misbehaviors and issue necessary messages.
/// TODO: Report multiple misbehaviors (https://github.com/paritytech/polkadot/issues/1387)
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn issue_new_misbehaviors(&mut self) -> Result<(), Error> {
let mut reports = Vec::new();
for (k, v) in self.table.get_misbehavior().iter() {
if !self.reported_misbehavior_for.contains(k) {
self.reported_misbehavior_for.insert(*k);
let f = FromTableMisbehavior {
id: *k,
report: v.clone(),
signing_context: self.table_context.signing_context.clone(),
key: self.table_context.validators[*k as usize].clone(),
};
if let Ok(report) = MisbehaviorReport::try_from(f) {
let message = ProvisionerMessage::ProvisionableData(
ProvisionableData::MisbehaviorReport(self.parent, report),
);
reports.push(message);
}
}
}
for report in reports.drain(..) {
self.send_to_provisioner(report).await?
}
Ok(())
}
/// Import a statement into the statement table and return the summary of the import.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn import_statement(
&mut self,
statement: &SignedFullStatement,
) -> Result<Option<TableSummary>, Error> {
asynchronous rob
committed
// create a span only for candidates we're already aware of.
let candidate_hash = statement.payload().candidate_hash();
self.get_unbacked_statement_child(&candidate_hash, statement.validator_index())
};
let stmt = primitive_statement_to_table(statement);
let summary = self.table.import_statement(&self.table_context, stmt);
let unbacked_span = if let Some(attested) = summary.as_ref()
.and_then(|s| self.table.attested_candidate(&s.candidate, &self.table_context))
{
let candidate_hash = attested.candidate.hash();
// `HashSet::insert` returns true if the thing wasn't in there already.
if self.backed.insert(candidate_hash) {
let span = self.remove_unbacked_span(&candidate_hash);
if let Some(backed) =
table_attested_to_backed(attested, &self.table_context)
{
let message = ProvisionerMessage::ProvisionableData(
self.parent,
ProvisionableData::BackedCandidate(backed.receipt()),
);
self.send_to_provisioner(message).await?;
span.as_ref().map(|s| s.child("backed"));
span
} else {
None
self.issue_new_misbehaviors().await?;
// It is important that the child span is dropped before its parent span (`unbacked_span`)
drop(import_statement_span);
drop(unbacked_span);
#[tracing::instrument(level = "trace", skip(self, span), fields(subsystem = LOG_TARGET))]
async fn process_msg(&mut self, span: &JaegerSpan, msg: CandidateBackingMessage) -> Result<(), Error> {
match msg {
CandidateBackingMessage::Second(_, candidate, pov) => {
let _timer = self.metrics.time_process_second();
// Sanity check that candidate is from our assignment.
if Some(candidate.descriptor().para_id) != self.assignment {
return Ok(());
}
// If the message is a `CandidateBackingMessage::Second`, sign and dispatch a
// Seconded statement only if we have not seconded any other candidate and
// have not signed a Valid statement for the requested candidate.
// This job has not seconded a candidate yet.
if !self.issued_statements.contains(&candidate_hash) {
self.validate_and_second(&span, &candidate, pov.clone()).await?;
}
}
}
CandidateBackingMessage::Statement(_, statement) => {
let _timer = self.metrics.time_process_statement();
self.check_statement_signature(&statement)?;
match self.maybe_validate_and_import(&span, statement).await {
Err(Error::ValidationFailed(_)) => return Ok(()),
Err(e) => return Err(e),
Ok(()) => (),
}
CandidateBackingMessage::GetBackedCandidates(_, requested_candidates, tx) => {
let _timer = self.metrics.time_get_backed_candidates();
let backed = requested_candidates
.into_iter()
.filter_map(|hash| {
self.table.attested_candidate(&hash, &self.table_context)
.and_then(|attested| table_attested_to_backed(attested, &self.table_context))
})
.collect();
tx.send(backed).map_err(|data| Error::Send(data))?;
}
}
Ok(())
}
/// Kick off validation work and distribute the result as a signed statement.
#[tracing::instrument(level = "trace", skip(self, span), fields(subsystem = LOG_TARGET))]
async fn kick_off_validation_work(
&mut self,
summary: TableSummary,
asynchronous rob
committed
span: Option<JaegerSpan>,
let candidate_hash = summary.candidate;
if self.issued_statements.contains(&candidate_hash) {
return Ok(())
}
// We clone the commitments here because there are borrowck
// errors relating to this being a struct and methods borrowing the entirety of self
// and not just those things that the function uses.
let candidate = self.table.get_candidate(&candidate_hash).ok_or(Error::CandidateNotFound)?.to_plain();
let descriptor = candidate.descriptor().clone();
// Check that candidate is collated by the right collator.
if self.required_collator.as_ref()
.map_or(false, |c| c != &descriptor.collator)
{
// If not, we've got the statement in the table but we will
// not issue validation work for it.
//
// Act as though we've issued a statement.
self.issued_statements.insert(candidate_hash);
return Ok(());
}
self.background_validate_and_make_available(BackgroundValidationParams {
tx_from: self.tx_from.clone(),
tx_command: self.background_validation_tx.clone(),
relay_parent: self.parent,
pov: None,
validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
n_validators: self.table_context.validators.len(),
make_command: ValidatedCandidateCommand::Attest,
}).await
}
/// Import the statement and kick off validation work if it is a part of our assignment.
#[tracing::instrument(level = "trace", skip(self, parent_span), fields(subsystem = LOG_TARGET))]
async fn maybe_validate_and_import(
&mut self,
parent_span: &JaegerSpan,
statement: SignedFullStatement,
) -> Result<(), Error> {
if let Some(summary) = self.import_statement(&statement).await? {
if let Statement::Seconded(_) = statement.payload() {
self.add_unbacked_span(parent_span, summary.candidate);
if Some(summary.group_id) == self.assignment {
asynchronous rob
committed
let span = self.get_unbacked_validation_child(&summary.candidate);
self.kick_off_validation_work(summary, span).await?;
}
}
}
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn sign_statement(&self, statement: Statement) -> Option<SignedFullStatement> {
let signed = self.table_context
.validator
.as_ref()?
.sign(self.keystore.clone(), statement)
.await
.ok()?;
self.metrics.on_statement_signed();
Some(signed)
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn check_statement_signature(&self, statement: &SignedFullStatement) -> Result<(), Error> {
let idx = statement.validator_index() as usize;
if self.table_context.validators.len() > idx {
statement.check_signature(
&self.table_context.signing_context,
&self.table_context.validators[idx],
).map_err(|_| Error::InvalidSignature)?;
} else {
return Err(Error::InvalidSignature);
}
Ok(())
}
fn add_unbacked_span(&mut self, parent_span: &JaegerSpan, hash: CandidateHash) {
asynchronous rob
committed
if !self.backed.contains(&hash) {
// only add if we don't consider this backed.
self.unbacked_candidates.entry(hash).or_insert_with(|| {
let mut span = parent_span.child("unbacked-candidate");
span.add_string_tag("candidate-hash", &format!("{:?}", hash.0));
span
});
}
}
fn get_unbacked_validation_child(&self, hash: &CandidateHash) -> Option<JaegerSpan> {
self.unbacked_candidates.get(hash).map(|span| span.child("validation"))
}
asynchronous rob
committed
fn get_unbacked_statement_child(&self, hash: &CandidateHash, validator: ValidatorIndex) -> Option<JaegerSpan> {
self.unbacked_candidates.get(hash).map(|span| {
let mut span = span.child("import-statement");
span.add_string_tag("validator-index", &format!("{}", validator));
span
})
}
fn remove_unbacked_span(&mut self, hash: &CandidateHash) -> Option<JaegerSpan> {
self.unbacked_candidates.remove(hash)
async fn send_to_provisioner(&mut self, msg: ProvisionerMessage) -> Result<(), Error> {
self.tx_from.send(AllMessages::from(msg).into()).await?;
async fn distribute_pov(
&mut self,
descriptor: CandidateDescriptor,
pov: Arc<PoV>,
) -> Result<(), Error> {
PoVDistributionMessage::DistributePoV(self.parent, descriptor, pov),
async fn distribute_signed_statement(&mut self, s: SignedFullStatement) -> Result<(), Error> {
let smsg = StatementDistributionMessage::Share(self.parent, s);
self.tx_from.send(AllMessages::from(smsg).into()).await?;
impl util::JobTrait for CandidateBackingJob {
type RunArgs = SyncCryptoStorePtr;
const NAME: &'static str = "CandidateBackingJob";
#[tracing::instrument(skip(keystore, metrics, rx_to, tx_from), fields(subsystem = LOG_TARGET))]
keystore: SyncCryptoStorePtr,
rx_to: mpsc::Receiver<Self::ToJob>,
mut tx_from: mpsc::Sender<FromJobCommand>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
asynchronous rob
committed
macro_rules! try_runtime_api {
($x: expr) => {
match $x {
Ok(x) => x,
Err(e) => {
err = ?e,
"Failed to fetch runtime API data for job",
asynchronous rob
committed
);
// We can't do candidate validation work if we don't have the
// requisite runtime API data. But these errors should not take
// down the node.
return Ok(());
}
}
}
}
let span = jaeger::hash_span(&parent, "run:backing");
let _span = span.child("runtime-apis");
asynchronous rob
committed
let (validators, groups, session_index, cores) = futures::try_join!(
try_runtime_api!(request_validators(parent, &mut tx_from).await),
try_runtime_api!(request_validator_groups(parent, &mut tx_from).await),
try_runtime_api!(request_session_index_for_child(parent, &mut tx_from).await),
try_runtime_api!(request_from_runtime(
asynchronous rob
committed
parent,
&mut tx_from,
|tx| RuntimeApiRequest::AvailabilityCores(tx),
).await),
).map_err(Error::JoinMultiple)?;
asynchronous rob
committed
let validators = try_runtime_api!(validators);
let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
let session_index = try_runtime_api!(session_index);
let cores = try_runtime_api!(cores);
let _span = span.child("validator-construction");
asynchronous rob
committed
let signing_context = SigningContext { parent_hash: parent, session_index };
let validator = match Validator::construct(
&validators,
signing_context,
keystore.clone(),
).await {
Ok(v) => v,
Err(util::Error::NotAValidator) => { return Ok(()) },
Err(e) => {
err = ?e,
"Cannot participate in candidate backing",
);
return Ok(())
}
};
let _span = span.child("calc-validator-groups");