Newer
Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Implements a `CandidateBackingSubsystem`.
#![deny(unused_crate_dependencies)]
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::pin::Pin;
use bitvec::vec::BitVec;
use futures::{
channel::{mpsc, oneshot},
Future, FutureExt, SinkExt, StreamExt,
use sp_keystore::SyncCryptoStorePtr;
CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorId,
ValidatorIndex, SigningContext, PoV,
CandidateDescriptor, AvailableData, ValidatorSignature, Hash, CandidateReceipt,
CandidateCommitments, CoreState, CoreIndex, CollatorId, ValidationOutputs,
FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult,
messages::{
AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData,
ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, ValidationFailed,
asynchronous rob
committed
RuntimeApiRequest,
Peter Goodspeed-Niklaus
committed
};
use polkadot_node_subsystem_util::{
self as util,
request_session_index_for_child,
request_validator_groups,
request_validators,
request_from_runtime,
Validator,
};
use statement_table::{
generic::AttestedCandidate as TableAttestedCandidate,
Context as TableContextTrait,
Table,
v1::{
Statement as TableStatement,
SignedStatement as TableSignedStatement, Summary as TableSummary,
},
#[error("Candidate is not found")]
#[error("Failed to send candidates {0:?}")]
Send(Vec<NewBackedCandidate>),
#[error("Oneshot never resolved")]
Oneshot(#[from] #[source] oneshot::Canceled),
#[error("Obtaining erasure chunks failed")]
ObtainErasureChunks(#[from] #[source] erasure_coding::Error),
#[error(transparent)]
ValidationFailed(#[from] ValidationFailed),
#[error(transparent)]
Mpsc(#[from] mpsc::SendError),
#[error(transparent)]
UtilError(#[from] util::Error),
}
/// Holds all data needed for candidate backing job operation.
struct CandidateBackingJob {
/// The hash of the relay parent on top of which this job is doing it's work.
parent: Hash,
/// Inbound message channel receiving part.
rx_to: mpsc::Receiver<ToJob>,
/// Outbound message channel sending part.
tx_from: mpsc::Sender<FromJob>,
/// The `ParaId` assigned to this validator
/// The collator required to author the candidate, if any.
required_collator: Option<CollatorId>,
/// We issued `Valid` or `Invalid` statements on about these candidates.
issued_statements: HashSet<Hash>,
/// `Some(h)` if this job has already issues `Seconded` statemt for some candidate with `h` hash.
seconded: Option<Hash>,
/// The candidates that are includable, by hash. Each entry here indicates
/// that we've sent the provisioner the backed candidate.
backed: HashSet<Hash>,
/// We have already reported misbehaviors for these validators.
reported_misbehavior_for: HashSet<ValidatorIndex>,
keystore: SyncCryptoStorePtr,
table: Table<TableContext>,
table_context: TableContext,
}
const fn group_quorum(n_validators: usize) -> usize {
(n_validators / 2) + 1
}
#[derive(Default)]
struct TableContext {
signing_context: SigningContext,
groups: HashMap<ParaId, Vec<ValidatorIndex>>,
validators: Vec<ValidatorId>,
}
impl TableContextTrait for TableContext {
type AuthorityId = ValidatorIndex;
type Digest = Hash;
type GroupId = ParaId;
type Signature = ValidatorSignature;
type Candidate = CommittedCandidateReceipt;
fn candidate_digest(candidate: &CommittedCandidateReceipt) -> Hash {
candidate.hash()
}
fn candidate_group(candidate: &CommittedCandidateReceipt) -> ParaId {
candidate.descriptor().para_id
}
fn is_member_of(&self, authority: &ValidatorIndex, group: &ParaId) -> bool {
self.groups.get(group).map_or(false, |g| g.iter().position(|a| a == authority).is_some())
}
fn requisite_votes(&self, group: &ParaId) -> usize {
self.groups.get(group).map_or(usize::max_value(), |g| group_quorum(g.len()))
}
}
/// A message type that is sent from `CandidateBackingSubsystem` to `CandidateBackingJob`.
pub enum ToJob {
/// A `CandidateBackingMessage`.
CandidateBacking(CandidateBackingMessage),
/// Stop working.
Stop,
}
impl TryFrom<AllMessages> for ToJob {
type Error = ();
fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
match msg {
AllMessages::CandidateBacking(msg) => Ok(ToJob::CandidateBacking(msg)),
_ => Err(()),
}
impl From<CandidateBackingMessage> for ToJob {
fn from(msg: CandidateBackingMessage) -> Self {
Self::CandidateBacking(msg)
impl util::ToJobTrait for ToJob {
const STOP: Self = ToJob::Stop;
fn relay_parent(&self) -> Option<Hash> {
match self {
Self::CandidateBacking(cb) => cb.relay_parent(),
Self::Stop => None,
}
}
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
}
/// A message type that is sent from `CandidateBackingJob` to `CandidateBackingSubsystem`.
enum FromJob {
AvailabilityStore(AvailabilityStoreMessage),
RuntimeApiMessage(RuntimeApiMessage),
CandidateValidation(CandidateValidationMessage),
CandidateSelection(CandidateSelectionMessage),
Provisioner(ProvisionerMessage),
PoVDistribution(PoVDistributionMessage),
StatementDistribution(StatementDistributionMessage),
}
impl From<FromJob> for AllMessages {
fn from(f: FromJob) -> Self {
match f {
FromJob::AvailabilityStore(msg) => AllMessages::AvailabilityStore(msg),
FromJob::RuntimeApiMessage(msg) => AllMessages::RuntimeApi(msg),
FromJob::CandidateValidation(msg) => AllMessages::CandidateValidation(msg),
FromJob::CandidateSelection(msg) => AllMessages::CandidateSelection(msg),
FromJob::StatementDistribution(msg) => AllMessages::StatementDistribution(msg),
FromJob::PoVDistribution(msg) => AllMessages::PoVDistribution(msg),
FromJob::Provisioner(msg) => AllMessages::Provisioner(msg),
}
}
}
impl TryFrom<AllMessages> for FromJob {
type Error = &'static str;
fn try_from(f: AllMessages) -> Result<Self, Self::Error> {
match f {
AllMessages::AvailabilityStore(msg) => Ok(FromJob::AvailabilityStore(msg)),
AllMessages::RuntimeApi(msg) => Ok(FromJob::RuntimeApiMessage(msg)),
AllMessages::CandidateValidation(msg) => Ok(FromJob::CandidateValidation(msg)),
AllMessages::CandidateSelection(msg) => Ok(FromJob::CandidateSelection(msg)),
AllMessages::StatementDistribution(msg) => Ok(FromJob::StatementDistribution(msg)),
AllMessages::PoVDistribution(msg) => Ok(FromJob::PoVDistribution(msg)),
AllMessages::Provisioner(msg) => Ok(FromJob::Provisioner(msg)),
_ => Err("can't convert this AllMessages variant to FromJob"),
}
}
}
// It looks like it's not possible to do an `impl From` given the current state of
// the code. So this does the necessary conversion.
fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement {
let statement = match s.payload() {
Statement::Seconded(c) => TableStatement::Candidate(c.clone()),
Statement::Valid(h) => TableStatement::Valid(h.clone()),
Statement::Invalid(h) => TableStatement::Invalid(h.clone()),
};
TableSignedStatement {
statement,
signature: s.signature().clone(),
sender: s.validator_index(),
}
}
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
fn table_attested_to_backed(
attested: TableAttestedCandidate<
ParaId,
CommittedCandidateReceipt,
ValidatorIndex,
ValidatorSignature,
>,
table_context: &TableContext,
) -> Option<BackedCandidate> {
let TableAttestedCandidate { candidate, validity_votes, group_id: para_id } = attested;
let (ids, validity_votes): (Vec<_>, Vec<_>) = validity_votes
.into_iter()
.map(|(id, vote)| (id, vote.into()))
.unzip();
let group = table_context.groups.get(¶_id)?;
let mut validator_indices = BitVec::with_capacity(group.len());
validator_indices.resize(group.len(), false);
for id in ids.iter() {
if let Some(position) = group.iter().position(|x| x == id) {
validator_indices.set(position, true);
}
}
Some(BackedCandidate {
candidate,
validity_votes,
validator_indices,
})
}
impl CandidateBackingJob {
/// Run asynchronously.
async fn run_loop(mut self) -> Result<(), Error> {
while let Some(msg) = self.rx_to.next().await {
match msg {
ToJob::CandidateBacking(msg) => {
self.process_msg(msg).await?;
}
_ => break,
}
}
Ok(())
}
async fn issue_candidate_invalid_message(
&mut self,
) -> Result<(), Error> {
self.tx_from.send(FromJob::CandidateSelection(
CandidateSelectionMessage::Invalid(self.parent, candidate)
)).await?;
Ok(())
}
/// Validate the candidate that is requested to be `Second`ed and distribute validation result.
///
/// Returns `Ok(true)` if we issued a `Seconded` statement about this candidate.
async fn validate_and_second(
&mut self,
candidate: &CandidateReceipt,
pov: PoV,
) -> Result<bool, Error> {
// Check that candidate is collated by the right collator.
if self.required_collator.as_ref()
.map_or(false, |c| c != &candidate.descriptor().collator)
{
self.issue_candidate_invalid_message(candidate.clone()).await?;
return Ok(false);
}
let valid = self.request_candidate_validation(
candidate.descriptor().clone(),
Arc::new(pov.clone()),
).await?;
let candidate_hash = candidate.hash();
let statement = match valid {
ValidationResult::Valid(outputs, validation_data) => {
// make PoV available for later distribution. Send data to the availability
// store to keep. Sign and dispatch `valid` statement to network if we
// have not seconded the given candidate.
//
// If the commitments hash produced by validation is not the same as given by
// the collator, do not make available and report the collator.
let commitments_check = self.make_pov_available(
pov,
outputs,
|commitments| if commitments.hash() == candidate.commitments_hash {
Ok(CommittedCandidateReceipt {
descriptor: candidate.descriptor().clone(),
commitments,
})
} else {
Err(())
},
).await?;
match commitments_check {
Ok(candidate) => {
self.issued_statements.insert(candidate_hash);
Some(Statement::Seconded(candidate))
}
Err(()) => {
self.issue_candidate_invalid_message(candidate.clone()).await?;
None
}
}
ValidationResult::Invalid(_reason) => {
// no need to issue a statement about this if we aren't seconding it.
//
// there's an infinite amount of garbage out there. no need to acknowledge
// all of it.
self.issue_candidate_invalid_message(candidate.clone()).await?;
None
let issued_statement = statement.is_some();
if let Some(statement) = statement {
self.sign_import_and_distribute_statement(statement).await?
async fn sign_import_and_distribute_statement(&mut self, statement: Statement) -> Result<(), Error> {
if let Some(signed_statement) = self.sign_statement(statement).await {
self.import_statement(&signed_statement).await?;
self.distribute_signed_statement(signed_statement).await?;
}
Ok(())
}
fn get_backed(&self) -> Vec<NewBackedCandidate> {
let proposed = self.table.proposed_candidates(&self.table_context);
let mut res = Vec::with_capacity(proposed.len());
for p in proposed.into_iter() {
match table_attested_to_backed(p, &self.table_context) {
Some(backed) => res.push(NewBackedCandidate(backed)),
}
}
res
}
/// Check if there have happened any new misbehaviors and issue necessary messages.
/// TODO: Report multiple misbehaviors (https://github.com/paritytech/polkadot/issues/1387)
async fn issue_new_misbehaviors(&mut self) -> Result<(), Error> {
let mut reports = Vec::new();
for (k, v) in self.table.get_misbehavior().iter() {
if !self.reported_misbehavior_for.contains(k) {
self.reported_misbehavior_for.insert(*k);
let f = FromTableMisbehavior {
id: *k,
report: v.clone(),
signing_context: self.table_context.signing_context.clone(),
key: self.table_context.validators[*k as usize].clone(),
};
if let Ok(report) = MisbehaviorReport::try_from(f) {
let message = ProvisionerMessage::ProvisionableData(
ProvisionableData::MisbehaviorReport(self.parent, report),
);
reports.push(message);
}
}
}
for report in reports.drain(..) {
self.send_to_provisioner(report).await?
}
Ok(())
}
/// Import a statement into the statement table and return the summary of the import.
async fn import_statement(
&mut self,
statement: &SignedFullStatement,
) -> Result<Option<TableSummary>, Error> {
let stmt = primitive_statement_to_table(statement);
let summary = self.table.import_statement(&self.table_context, stmt);
if let Some(ref summary) = summary {
if let Some(attested) = self.table.attested_candidate(
&summary.candidate,
&self.table_context,
) {
// `HashSet::insert` returns true if the thing wasn't in there already.
// one of the few places the Rust-std folks did a bad job with API
if self.backed.insert(summary.candidate) {
if let Some(backed) =
table_attested_to_backed(attested, &self.table_context)
{
let message = ProvisionerMessage::ProvisionableData(
ProvisionableData::BackedCandidate(backed),
);
self.send_to_provisioner(message).await?;
}
}
}
}
self.issue_new_misbehaviors().await?;
}
async fn process_msg(&mut self, msg: CandidateBackingMessage) -> Result<(), Error> {
match msg {
CandidateBackingMessage::Second(_, candidate, pov) => {
// Sanity check that candidate is from our assignment.
if candidate.descriptor().para_id != self.assignment {
return Ok(());
}
// If the message is a `CandidateBackingMessage::Second`, sign and dispatch a
// Seconded statement only if we have not seconded any other candidate and
// have not signed a Valid statement for the requested candidate.
// This job has not seconded a candidate yet.
let candidate_hash = candidate.hash();
if !self.issued_statements.contains(&candidate_hash) {
if let Ok(true) = self.validate_and_second(
&candidate,
pov,
).await {
self.metrics.on_candidate_seconded();
self.seconded = Some(candidate_hash);
}
}
}
}
CandidateBackingMessage::Statement(_, statement) => {
self.check_statement_signature(&statement)?;
match self.maybe_validate_and_import(statement).await {
Err(Error::ValidationFailed(_)) => return Ok(()),
Err(e) => return Err(e),
Ok(()) => (),
}
}
CandidateBackingMessage::GetBackedCandidates(_, tx) => {
let backed = self.get_backed();
tx.send(backed).map_err(|data| Error::Send(data))?;
}
}
Ok(())
}
/// Kick off validation work and distribute the result as a signed statement.
async fn kick_off_validation_work(
&mut self,
summary: TableSummary,
) -> Result<(), Error> {
let candidate_hash = summary.candidate.clone();
if self.issued_statements.contains(&candidate_hash) {
return Ok(())
}
// We clone the commitments here because there are borrowck
// errors relating to this being a struct and methods borrowing the entirety of self
// and not just those things that the function uses.
let candidate = self.table.get_candidate(&candidate_hash).ok_or(Error::CandidateNotFound)?;
let expected_commitments = candidate.commitments.clone();
let descriptor = candidate.descriptor().clone();
// Check that candidate is collated by the right collator.
if self.required_collator.as_ref()
.map_or(false, |c| c != &descriptor.collator)
{
// If not, we've got the statement in the table but we will
// not issue validation work for it.
//
// Act as though we've issued a statement.
self.issued_statements.insert(candidate_hash);
return Ok(());
}
let pov = self.request_pov_from_distribution(descriptor.clone()).await?;
let v = self.request_candidate_validation(descriptor, pov.clone()).await?;
let statement = match v {
ValidationResult::Valid(outputs, validation_data) => {
// If validation produces a new set of commitments, we vote the candidate as invalid.
let commitments_check = self.make_pov_available(
(&*pov).clone(),
outputs,
|commitments| if commitments == expected_commitments {
Ok(())
} else {
Err(())
}
).await?;
match commitments_check {
Ok(()) => Statement::Valid(candidate_hash),
Err(()) => Statement::Invalid(candidate_hash),
}
ValidationResult::Invalid(_reason) => {
Statement::Invalid(candidate_hash)
}
};
self.issued_statements.insert(candidate_hash);
self.sign_import_and_distribute_statement(statement).await
}
/// Import the statement and kick off validation work if it is a part of our assignment.
async fn maybe_validate_and_import(
&mut self,
statement: SignedFullStatement,
) -> Result<(), Error> {
if let Some(summary) = self.import_statement(&statement).await? {
if let Statement::Seconded(_) = statement.payload() {
if summary.group_id == self.assignment {
self.kick_off_validation_work(summary).await?;
}
}
}
Ok(())
}
async fn sign_statement(&self, statement: Statement) -> Option<SignedFullStatement> {
let signed = self.table_context
.validator
.as_ref()?
.sign(self.keystore.clone(), statement)
.await
.ok()?;
self.metrics.on_statement_signed();
Some(signed)
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
}
fn check_statement_signature(&self, statement: &SignedFullStatement) -> Result<(), Error> {
let idx = statement.validator_index() as usize;
if self.table_context.validators.len() > idx {
statement.check_signature(
&self.table_context.signing_context,
&self.table_context.validators[idx],
).map_err(|_| Error::InvalidSignature)?;
} else {
return Err(Error::InvalidSignature);
}
Ok(())
}
async fn send_to_provisioner(&mut self, msg: ProvisionerMessage) -> Result<(), Error> {
self.tx_from.send(FromJob::Provisioner(msg)).await?;
Ok(())
}
async fn request_pov_from_distribution(
&mut self,
descriptor: CandidateDescriptor,
let (tx, rx) = oneshot::channel();
self.tx_from.send(FromJob::PoVDistribution(
PoVDistributionMessage::FetchPoV(self.parent, descriptor, tx)
)).await?;
}
async fn request_candidate_validation(
&mut self,
candidate: CandidateDescriptor,
pov: Arc<PoV>,
) -> Result<ValidationResult, Error> {
let (tx, rx) = oneshot::channel();
self.tx_from.send(FromJob::CandidateValidation(
CandidateValidationMessage::ValidateFromChainState(
candidate,
pov,
tx,
)
)
).await?;
Ok(rx.await??)
}
id: Option<ValidatorIndex>,
n_validators: u32,
available_data: AvailableData,
let (tx, rx) = oneshot::channel();
self.tx_from.send(FromJob::AvailabilityStore(
AvailabilityStoreMessage::StoreAvailableData(
self.parent,
id,
n_validators,
available_data,
tx,
)
//
// This calls an inspection function before making the PoV available for any last checks
// that need to be done. If the inspection function returns an error, this function returns
// early without making the PoV available.
async fn make_pov_available<T, E>(
validation_data: polkadot_primitives::v1::PersistedValidationData,
outputs: ValidationOutputs,
with_commitments: impl FnOnce(CandidateCommitments) -> Result<T, E>,
) -> Result<Result<T, E>, Error> {
let chunks = erasure_coding::obtain_chunks_v1(
self.table_context.validators.len(),
&available_data,
)?;
let branches = erasure_coding::branches(chunks.as_ref());
let erasure_root = branches.root();
let commitments = CandidateCommitments {
upward_messages: outputs.upward_messages,
erasure_root,
new_validation_code: outputs.new_validation_code,
head_data: outputs.head_data,
processed_downward_messages: outputs.processed_downward_messages,
let res = match with_commitments(commitments) {
Ok(x) => x,
Err(e) => return Ok(Err(e)),
};
self.store_available_data(
self.table_context.validator.as_ref().map(|v| v.index()),
self.table_context.validators.len() as u32,
available_data,
).await?;
}
async fn distribute_signed_statement(&mut self, s: SignedFullStatement) -> Result<(), Error> {
let smsg = StatementDistributionMessage::Share(self.parent, s);
self.tx_from.send(FromJob::StatementDistribution(smsg)).await?;
Ok(())
}
}
impl util::JobTrait for CandidateBackingJob {
type ToJob = ToJob;
type FromJob = FromJob;
type Error = Error;
type RunArgs = SyncCryptoStorePtr;
const NAME: &'static str = "CandidateBackingJob";
keystore: SyncCryptoStorePtr,
rx_to: mpsc::Receiver<Self::ToJob>,
mut tx_from: mpsc::Sender<Self::FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
asynchronous rob
committed
macro_rules! try_runtime_api {
($x: expr) => {
match $x {
Ok(x) => x,
Err(e) => {
log::warn!(
target: "candidate_backing",
"Failed to fetch runtime API data for job: {:?}",
e,
);
// We can't do candidate validation work if we don't have the
// requisite runtime API data. But these errors should not take
// down the node.
return Ok(());
}
}
}
}
let (validators, groups, session_index, cores) = futures::try_join!(
request_validators(parent, &mut tx_from).await?,
request_validator_groups(parent, &mut tx_from).await?,
asynchronous rob
committed
request_session_index_for_child(parent, &mut tx_from).await?,
request_from_runtime(
parent,
&mut tx_from,
|tx| RuntimeApiRequest::AvailabilityCores(tx),
).await?,
asynchronous rob
committed
let validators = try_runtime_api!(validators);
let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
let session_index = try_runtime_api!(session_index);
let cores = try_runtime_api!(cores);
let signing_context = SigningContext { parent_hash: parent, session_index };
let validator = match Validator::construct(
&validators,
signing_context,
keystore.clone(),
).await {
Ok(v) => v,
Err(util::Error::NotAValidator) => { return Ok(()) },
Err(e) => {
log::warn!(
target: "candidate_backing",
"Cannot participate in candidate backing: {:?}",
e
);
return Ok(())
}
};
let mut groups = HashMap::new();
asynchronous rob
committed
let n_cores = cores.len();
let mut assignment = None;
asynchronous rob
committed
for (idx, core) in cores.into_iter().enumerate() {
// Ignore prospective assignments on occupied cores for the time being.
if let CoreState::Scheduled(scheduled) = core {
let core_index = CoreIndex(idx as _);
let group_index = group_rotation_info.group_for_core(core_index, n_cores);
if let Some(g) = validator_groups.get(group_index.0 as usize) {
if g.contains(&validator.index()) {
assignment = Some((scheduled.para_id, scheduled.collator));
}
asynchronous rob
committed
groups.insert(scheduled.para_id, g.clone());
}
let table_context = TableContext {
groups,
validators,
signing_context: validator.signing_context().clone(),
validator: Some(validator),
};
let (assignment, required_collator) = match assignment {
None => return Ok(()), // no need to work.
Some((a, r)) => (a, r),
};
let job = CandidateBackingJob {
parent,
rx_to,
tx_from,
assignment,
required_collator,
issued_statements: HashSet::new(),
seconded: None,
reported_misbehavior_for: HashSet::new(),
keystore,
table: Table::default(),
table_context,
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
#[derive(Clone)]
struct MetricsInner {
signed_statements_total: prometheus::Counter<prometheus::U64>,
candidates_seconded_total: prometheus::Counter<prometheus::U64>
}
/// Candidate backing metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_statement_signed(&self) {
if let Some(metrics) = &self.0 {
metrics.signed_statements_total.inc();
}
}
fn on_candidate_seconded(&self) {
if let Some(metrics) = &self.0 {
metrics.candidates_seconded_total.inc();
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
signed_statements_total: prometheus::register(
prometheus::Counter::new(
"parachain_signed_statements_total",
"Number of statements signed.",
)?,
registry,
)?,
candidates_seconded_total: prometheus::register(
prometheus::Counter::new(
"parachain_candidates_seconded_total",
"Number of candidates seconded.",
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
delegated_subsystem!(CandidateBackingJob(SyncCryptoStorePtr, Metrics) <- ToJob as CandidateBackingSubsystem);
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use futures::{future, Future};
ScheduledCore, BlockData, CandidateCommitments,
PersistedValidationData, ValidationData, TransientValidationData, HeadData,
ValidityAttestation, GroupRotationInfo,
asynchronous rob
committed
messages::RuntimeApiRequest,
ActiveLeavesUpdate, FromOverseer, OverseerSignal,
use polkadot_node_primitives::InvalidCandidate;
use sp_application_crypto::AppKey;
use sp_keystore::{CryptoStore, SyncCryptoStore};
fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec<ValidatorId> {
val_ids.iter().map(|v| v.public().into()).collect()
}
struct TestState {
chain_ids: Vec<ParaId>,
keystore: SyncCryptoStorePtr,
validators: Vec<Sr25519Keyring>,
validator_public: Vec<ValidatorId>,
validation_data: ValidationData,
asynchronous rob
committed
validator_groups: (Vec<Vec<ValidatorIndex>>, GroupRotationInfo),
availability_cores: Vec<CoreState>,
head_data: HashMap<ParaId, HeadData>,
signing_context: SigningContext,
relay_parent: Hash,
}
impl Default for TestState {
fn default() -> Self {
let chain_a = ParaId::from(1);
let chain_b = ParaId::from(2);
let thread_a = ParaId::from(3);
let chain_ids = vec![chain_a, chain_b, thread_a];
let validators = vec![
Sr25519Keyring::Alice,
Sr25519Keyring::Bob,
Sr25519Keyring::Charlie,
Sr25519Keyring::Dave,
Sr25519Keyring::Ferdie,
let keystore = Arc::new(sc_keystore::LocalKeystore::in_memory());
// Make sure `Alice` key is in the keystore, so this mocked node will be a parachain validator.
SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, Some(&validators[0].to_seed()))
.expect("Insert key into keystore");
let validator_public = validator_pubkeys(&validators);
let validator_groups = vec![vec![2, 0, 3, 5], vec![1], vec![4]];
asynchronous rob
committed
let group_rotation_info = GroupRotationInfo {
session_start_block: 0,
group_rotation_frequency: 100,
now: 1,
};
let thread_collator: CollatorId = Sr25519Keyring::Two.public().into();
asynchronous rob
committed
let availability_cores = vec![
CoreState::Scheduled(ScheduledCore {
para_id: chain_a,
collator: None,
}),
CoreState::Scheduled(ScheduledCore {
para_id: chain_b,
collator: None,
}),
CoreState::Scheduled(ScheduledCore {
para_id: thread_a,
collator: Some(thread_collator.clone()),
}),
];
asynchronous rob
committed
let mut head_data = HashMap::new();
head_data.insert(chain_a, HeadData(vec![4, 5, 6]));