Newer
Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Implements a `CandidateBackingSubsystem`.
#![deny(unused_crate_dependencies)]
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::pin::Pin;
use bitvec::vec::BitVec;
use futures::{
channel::{mpsc, oneshot},
Future, FutureExt, SinkExt, StreamExt,
use sp_keystore::SyncCryptoStorePtr;
CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorId,
ValidatorIndex, SigningContext, PoV,
CandidateDescriptor, AvailableData, ValidatorSignature, Hash, CandidateReceipt,
CandidateCommitments, CoreState, CoreIndex, CollatorId, ValidationOutputs,
FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult,
messages::{
AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData,
ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, ValidationFailed,
asynchronous rob
committed
RuntimeApiRequest,
Peter Goodspeed-Niklaus
committed
};
use polkadot_node_subsystem_util::{
self as util,
request_session_index_for_child,
request_validator_groups,
request_validators,
request_from_runtime,
Validator,
};
use statement_table::{
generic::AttestedCandidate as TableAttestedCandidate,
Context as TableContextTrait,
Table,
v1::{
Statement as TableStatement,
SignedStatement as TableSignedStatement, Summary as TableSummary,
},
#[error("Candidate is not found")]
#[error("Failed to send candidates {0:?}")]
Send(Vec<NewBackedCandidate>),
#[error("Oneshot never resolved")]
Oneshot(#[from] #[source] oneshot::Canceled),
#[error("Obtaining erasure chunks failed")]
ObtainErasureChunks(#[from] #[source] erasure_coding::Error),
#[error(transparent)]
ValidationFailed(#[from] ValidationFailed),
#[error(transparent)]
Mpsc(#[from] mpsc::SendError),
#[error(transparent)]
UtilError(#[from] util::Error),
}
/// Holds all data needed for candidate backing job operation.
struct CandidateBackingJob {
/// The hash of the relay parent on top of which this job is doing it's work.
parent: Hash,
/// Inbound message channel receiving part.
rx_to: mpsc::Receiver<ToJob>,
/// Outbound message channel sending part.
tx_from: mpsc::Sender<FromJob>,
/// The `ParaId` assigned to this validator
/// The collator required to author the candidate, if any.
required_collator: Option<CollatorId>,
/// We issued `Valid` or `Invalid` statements on about these candidates.
issued_statements: HashSet<Hash>,
/// `Some(h)` if this job has already issues `Seconded` statemt for some candidate with `h` hash.
seconded: Option<Hash>,
/// We have already reported misbehaviors for these validators.
reported_misbehavior_for: HashSet<ValidatorIndex>,
keystore: SyncCryptoStorePtr,
table: Table<TableContext>,
table_context: TableContext,
}
const fn group_quorum(n_validators: usize) -> usize {
(n_validators / 2) + 1
}
#[derive(Default)]
struct TableContext {
signing_context: SigningContext,
groups: HashMap<ParaId, Vec<ValidatorIndex>>,
validators: Vec<ValidatorId>,
}
impl TableContextTrait for TableContext {
type AuthorityId = ValidatorIndex;
type Digest = Hash;
type GroupId = ParaId;
type Signature = ValidatorSignature;
type Candidate = CommittedCandidateReceipt;
fn candidate_digest(candidate: &CommittedCandidateReceipt) -> Hash {
candidate.hash()
}
fn candidate_group(candidate: &CommittedCandidateReceipt) -> ParaId {
candidate.descriptor().para_id
}
fn is_member_of(&self, authority: &ValidatorIndex, group: &ParaId) -> bool {
self.groups.get(group).map_or(false, |g| g.iter().position(|a| a == authority).is_some())
}
fn requisite_votes(&self, group: &ParaId) -> usize {
self.groups.get(group).map_or(usize::max_value(), |g| group_quorum(g.len()))
}
}
/// A message type that is sent from `CandidateBackingSubsystem` to `CandidateBackingJob`.
pub enum ToJob {
/// A `CandidateBackingMessage`.
CandidateBacking(CandidateBackingMessage),
/// Stop working.
Stop,
}
impl TryFrom<AllMessages> for ToJob {
type Error = ();
fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
match msg {
AllMessages::CandidateBacking(msg) => Ok(ToJob::CandidateBacking(msg)),
_ => Err(()),
}
impl From<CandidateBackingMessage> for ToJob {
fn from(msg: CandidateBackingMessage) -> Self {
Self::CandidateBacking(msg)
impl util::ToJobTrait for ToJob {
const STOP: Self = ToJob::Stop;
fn relay_parent(&self) -> Option<Hash> {
match self {
Self::CandidateBacking(cb) => cb.relay_parent(),
Self::Stop => None,
}
}
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
}
/// A message type that is sent from `CandidateBackingJob` to `CandidateBackingSubsystem`.
enum FromJob {
AvailabilityStore(AvailabilityStoreMessage),
RuntimeApiMessage(RuntimeApiMessage),
CandidateValidation(CandidateValidationMessage),
CandidateSelection(CandidateSelectionMessage),
Provisioner(ProvisionerMessage),
PoVDistribution(PoVDistributionMessage),
StatementDistribution(StatementDistributionMessage),
}
impl From<FromJob> for AllMessages {
fn from(f: FromJob) -> Self {
match f {
FromJob::AvailabilityStore(msg) => AllMessages::AvailabilityStore(msg),
FromJob::RuntimeApiMessage(msg) => AllMessages::RuntimeApi(msg),
FromJob::CandidateValidation(msg) => AllMessages::CandidateValidation(msg),
FromJob::CandidateSelection(msg) => AllMessages::CandidateSelection(msg),
FromJob::StatementDistribution(msg) => AllMessages::StatementDistribution(msg),
FromJob::PoVDistribution(msg) => AllMessages::PoVDistribution(msg),
FromJob::Provisioner(msg) => AllMessages::Provisioner(msg),
}
}
}
impl TryFrom<AllMessages> for FromJob {
type Error = &'static str;
fn try_from(f: AllMessages) -> Result<Self, Self::Error> {
match f {
AllMessages::AvailabilityStore(msg) => Ok(FromJob::AvailabilityStore(msg)),
AllMessages::RuntimeApi(msg) => Ok(FromJob::RuntimeApiMessage(msg)),
AllMessages::CandidateValidation(msg) => Ok(FromJob::CandidateValidation(msg)),
AllMessages::CandidateSelection(msg) => Ok(FromJob::CandidateSelection(msg)),
AllMessages::StatementDistribution(msg) => Ok(FromJob::StatementDistribution(msg)),
AllMessages::PoVDistribution(msg) => Ok(FromJob::PoVDistribution(msg)),
AllMessages::Provisioner(msg) => Ok(FromJob::Provisioner(msg)),
_ => Err("can't convert this AllMessages variant to FromJob"),
}
}
}
// It looks like it's not possible to do an `impl From` given the current state of
// the code. So this does the necessary conversion.
fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement {
let statement = match s.payload() {
Statement::Seconded(c) => TableStatement::Candidate(c.clone()),
Statement::Valid(h) => TableStatement::Valid(h.clone()),
Statement::Invalid(h) => TableStatement::Invalid(h.clone()),
};
TableSignedStatement {
statement,
signature: s.signature().clone(),
sender: s.validator_index(),
}
}
impl CandidateBackingJob {
/// Run asynchronously.
async fn run_loop(mut self) -> Result<(), Error> {
while let Some(msg) = self.rx_to.next().await {
match msg {
ToJob::CandidateBacking(msg) => {
self.process_msg(msg).await?;
}
_ => break,
}
}
Ok(())
}
async fn issue_candidate_invalid_message(
&mut self,
) -> Result<(), Error> {
self.tx_from.send(FromJob::CandidateSelection(
CandidateSelectionMessage::Invalid(self.parent, candidate)
)).await?;
Ok(())
}
/// Validate the candidate that is requested to be `Second`ed and distribute validation result.
///
/// Returns `Ok(true)` if we issued a `Seconded` statement about this candidate.
async fn validate_and_second(
&mut self,
candidate: &CandidateReceipt,
pov: PoV,
) -> Result<bool, Error> {
// Check that candidate is collated by the right collator.
if self.required_collator.as_ref()
.map_or(false, |c| c != &candidate.descriptor().collator)
{
self.issue_candidate_invalid_message(candidate.clone()).await?;
return Ok(false);
}
let valid = self.request_candidate_validation(
candidate.descriptor().clone(),
Arc::new(pov.clone()),
).await?;
let candidate_hash = candidate.hash();
let statement = match valid {
ValidationResult::Valid(outputs, validation_data) => {
// make PoV available for later distribution. Send data to the availability
// store to keep. Sign and dispatch `valid` statement to network if we
// have not seconded the given candidate.
//
// If the commitments hash produced by validation is not the same as given by
// the collator, do not make available and report the collator.
let commitments_check = self.make_pov_available(
pov,
outputs,
|commitments| if commitments.hash() == candidate.commitments_hash {
Ok(CommittedCandidateReceipt {
descriptor: candidate.descriptor().clone(),
commitments,
})
} else {
Err(())
},
).await?;
match commitments_check {
Ok(candidate) => {
self.issued_statements.insert(candidate_hash);
Some(Statement::Seconded(candidate))
}
Err(()) => {
self.issue_candidate_invalid_message(candidate.clone()).await?;
None
}
}
ValidationResult::Invalid(_reason) => {
// no need to issue a statement about this if we aren't seconding it.
//
// there's an infinite amount of garbage out there. no need to acknowledge
// all of it.
self.issue_candidate_invalid_message(candidate.clone()).await?;
None
let issued_statement = statement.is_some();
if let Some(statement) = statement {
if let Some(signed_statement) = self.sign_statement(statement).await {
self.import_statement(&signed_statement).await?;
self.distribute_signed_statement(signed_statement).await?;
}
}
fn get_backed(&self) -> Vec<NewBackedCandidate> {
let proposed = self.table.proposed_candidates(&self.table_context);
let mut res = Vec::with_capacity(proposed.len());
for p in proposed.into_iter() {
let TableAttestedCandidate { candidate, validity_votes, .. } = p;
let (ids, validity_votes): (Vec<_>, Vec<_>) = validity_votes
.into_iter()
.map(|(id, vote)| (id, vote.into()))
.unzip();
let group = match self.table_context.groups.get(&self.assignment) {
Some(group) => group,
None => continue,
};
let mut validator_indices = BitVec::with_capacity(group.len());
validator_indices.resize(group.len(), false);
for id in ids.iter() {
if let Some(position) = group.iter().position(|x| x == id) {
validator_indices.set(position, true);
}
}
let backed = BackedCandidate {
candidate,
validity_votes,
validator_indices,
};
res.push(NewBackedCandidate(backed.clone()));
}
res
}
/// Check if there have happened any new misbehaviors and issue necessary messages.
/// TODO: Report multiple misbehaviors (https://github.com/paritytech/polkadot/issues/1387)
async fn issue_new_misbehaviors(&mut self) -> Result<(), Error> {
let mut reports = Vec::new();
for (k, v) in self.table.get_misbehavior().iter() {
if !self.reported_misbehavior_for.contains(k) {
self.reported_misbehavior_for.insert(*k);
let f = FromTableMisbehavior {
id: *k,
report: v.clone(),
signing_context: self.table_context.signing_context.clone(),
key: self.table_context.validators[*k as usize].clone(),
};
if let Ok(report) = MisbehaviorReport::try_from(f) {
let message = ProvisionerMessage::ProvisionableData(
ProvisionableData::MisbehaviorReport(self.parent, report),
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
);
reports.push(message);
}
}
}
for report in reports.drain(..) {
self.send_to_provisioner(report).await?
}
Ok(())
}
/// Import a statement into the statement table and return the summary of the import.
async fn import_statement(
&mut self,
statement: &SignedFullStatement,
) -> Result<Option<TableSummary>, Error> {
let stmt = primitive_statement_to_table(statement);
let summary = self.table.import_statement(&self.table_context, stmt);
self.issue_new_misbehaviors().await?;
return Ok(summary);
}
async fn process_msg(&mut self, msg: CandidateBackingMessage) -> Result<(), Error> {
match msg {
CandidateBackingMessage::Second(_, candidate, pov) => {
// Sanity check that candidate is from our assignment.
if candidate.descriptor().para_id != self.assignment {
return Ok(());
}
// If the message is a `CandidateBackingMessage::Second`, sign and dispatch a
// Seconded statement only if we have not seconded any other candidate and
// have not signed a Valid statement for the requested candidate.
match self.seconded {
// This job has not seconded a candidate yet.
None => {
let candidate_hash = candidate.hash();
if !self.issued_statements.contains(&candidate_hash) {
if let Ok(true) = self.validate_and_second(
&candidate,
self.metrics.on_candidate_seconded();
self.seconded = Some(candidate_hash);
}
}
}
// This job has already seconded a candidate.
Some(_) => {}
}
}
CandidateBackingMessage::Statement(_, statement) => {
self.check_statement_signature(&statement)?;
match self.maybe_validate_and_import(statement).await {
Err(Error::ValidationFailed(_)) => return Ok(()),
Err(e) => return Err(e),
Ok(()) => (),
}
}
CandidateBackingMessage::GetBackedCandidates(_, tx) => {
let backed = self.get_backed();
tx.send(backed).map_err(|data| Error::Send(data))?;
}
}
Ok(())
}
/// Kick off validation work and distribute the result as a signed statement.
async fn kick_off_validation_work(
&mut self,
summary: TableSummary,
) -> Result<(), Error> {
let candidate_hash = summary.candidate.clone();
if self.issued_statements.contains(&candidate_hash) {
return Ok(())
}
// We clone the commitments here because there are borrowck
// errors relating to this being a struct and methods borrowing the entirety of self
// and not just those things that the function uses.
let candidate = self.table.get_candidate(&candidate_hash).ok_or(Error::CandidateNotFound)?;
let expected_commitments = candidate.commitments.clone();
let descriptor = candidate.descriptor().clone();
// Check that candidate is collated by the right collator.
if self.required_collator.as_ref()
.map_or(false, |c| c != &descriptor.collator)
{
// If not, we've got the statement in the table but we will
// not issue validation work for it.
//
// Act as though we've issued a statement.
self.issued_statements.insert(candidate_hash);
return Ok(());
}
let pov = self.request_pov_from_distribution(descriptor.clone()).await?;
let v = self.request_candidate_validation(descriptor, pov.clone()).await?;
let statement = match v {
ValidationResult::Valid(outputs, validation_data) => {
// If validation produces a new set of commitments, we vote the candidate as invalid.
let commitments_check = self.make_pov_available(
(&*pov).clone(),
outputs,
|commitments| if commitments == expected_commitments {
Ok(())
} else {
Err(())
}
).await?;
match commitments_check {
Ok(()) => Statement::Valid(candidate_hash),
Err(()) => Statement::Invalid(candidate_hash),
}
ValidationResult::Invalid(_reason) => {
Statement::Invalid(candidate_hash)
}
};
self.issued_statements.insert(candidate_hash);
if let Some(signed_statement) = self.sign_statement(statement).await {
self.distribute_signed_statement(signed_statement).await?;
}
}
/// Import the statement and kick off validation work if it is a part of our assignment.
async fn maybe_validate_and_import(
&mut self,
statement: SignedFullStatement,
) -> Result<(), Error> {
if let Some(summary) = self.import_statement(&statement).await? {
if let Statement::Seconded(_) = statement.payload() {
if summary.group_id == self.assignment {
self.kick_off_validation_work(summary).await?;
}
}
}
Ok(())
}
async fn sign_statement(&self, statement: Statement) -> Option<SignedFullStatement> {
let signed = self.table_context
.validator
.as_ref()?
.sign(self.keystore.clone(), statement)
.await
.ok()?;
self.metrics.on_statement_signed();
Some(signed)
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
}
fn check_statement_signature(&self, statement: &SignedFullStatement) -> Result<(), Error> {
let idx = statement.validator_index() as usize;
if self.table_context.validators.len() > idx {
statement.check_signature(
&self.table_context.signing_context,
&self.table_context.validators[idx],
).map_err(|_| Error::InvalidSignature)?;
} else {
return Err(Error::InvalidSignature);
}
Ok(())
}
async fn send_to_provisioner(&mut self, msg: ProvisionerMessage) -> Result<(), Error> {
self.tx_from.send(FromJob::Provisioner(msg)).await?;
Ok(())
}
async fn request_pov_from_distribution(
&mut self,
descriptor: CandidateDescriptor,
let (tx, rx) = oneshot::channel();
self.tx_from.send(FromJob::PoVDistribution(
PoVDistributionMessage::FetchPoV(self.parent, descriptor, tx)
)).await?;
}
async fn request_candidate_validation(
&mut self,
candidate: CandidateDescriptor,
pov: Arc<PoV>,
) -> Result<ValidationResult, Error> {
let (tx, rx) = oneshot::channel();
self.tx_from.send(FromJob::CandidateValidation(
CandidateValidationMessage::ValidateFromChainState(
candidate,
pov,
tx,
)
)
).await?;
Ok(rx.await??)
}
id: Option<ValidatorIndex>,
n_validators: u32,
available_data: AvailableData,
let (tx, rx) = oneshot::channel();
self.tx_from.send(FromJob::AvailabilityStore(
AvailabilityStoreMessage::StoreAvailableData(
self.parent,
id,
n_validators,
available_data,
tx,
)
//
// This calls an inspection function before making the PoV available for any last checks
// that need to be done. If the inspection function returns an error, this function returns
// early without making the PoV available.
async fn make_pov_available<T, E>(
validation_data: polkadot_primitives::v1::PersistedValidationData,
outputs: ValidationOutputs,
with_commitments: impl FnOnce(CandidateCommitments) -> Result<T, E>,
) -> Result<Result<T, E>, Error> {
let chunks = erasure_coding::obtain_chunks_v1(
self.table_context.validators.len(),
&available_data,
)?;
let branches = erasure_coding::branches(chunks.as_ref());
let erasure_root = branches.root();
let commitments = CandidateCommitments {
upward_messages: outputs.upward_messages,
erasure_root,
new_validation_code: outputs.new_validation_code,
head_data: outputs.head_data,
processed_downward_messages: outputs.processed_downward_messages,
let res = match with_commitments(commitments) {
Ok(x) => x,
Err(e) => return Ok(Err(e)),
};
self.store_available_data(
self.table_context.validator.as_ref().map(|v| v.index()),
self.table_context.validators.len() as u32,
available_data,
).await?;
}
async fn distribute_signed_statement(&mut self, s: SignedFullStatement) -> Result<(), Error> {
let smsg = StatementDistributionMessage::Share(self.parent, s);
self.tx_from.send(FromJob::StatementDistribution(smsg)).await?;
Ok(())
}
}
impl util::JobTrait for CandidateBackingJob {
type ToJob = ToJob;
type FromJob = FromJob;
type Error = Error;
type RunArgs = SyncCryptoStorePtr;
const NAME: &'static str = "CandidateBackingJob";
keystore: SyncCryptoStorePtr,
rx_to: mpsc::Receiver<Self::ToJob>,
mut tx_from: mpsc::Sender<Self::FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
asynchronous rob
committed
macro_rules! try_runtime_api {
($x: expr) => {
match $x {
Ok(x) => x,
Err(e) => {
log::warn!(
target: "candidate_backing",
"Failed to fetch runtime API data for job: {:?}",
e,
);
// We can't do candidate validation work if we don't have the
// requisite runtime API data. But these errors should not take
// down the node.
return Ok(());
}
}
}
}
let (validators, groups, session_index, cores) = futures::try_join!(
request_validators(parent, &mut tx_from).await?,
request_validator_groups(parent, &mut tx_from).await?,
asynchronous rob
committed
request_session_index_for_child(parent, &mut tx_from).await?,
request_from_runtime(
parent,
&mut tx_from,
|tx| RuntimeApiRequest::AvailabilityCores(tx),
).await?,
asynchronous rob
committed
let validators = try_runtime_api!(validators);
let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
let session_index = try_runtime_api!(session_index);
let cores = try_runtime_api!(cores);
let signing_context = SigningContext { parent_hash: parent, session_index };
let validator = match Validator::construct(
&validators,
signing_context,
keystore.clone(),
).await {
Ok(v) => v,
Err(util::Error::NotAValidator) => { return Ok(()) },
Err(e) => {
log::warn!(
target: "candidate_backing",
"Cannot participate in candidate backing: {:?}",
e
);
return Ok(())
}
};
let mut groups = HashMap::new();
asynchronous rob
committed
let n_cores = cores.len();
let mut assignment = None;
asynchronous rob
committed
for (idx, core) in cores.into_iter().enumerate() {
// Ignore prospective assignments on occupied cores for the time being.
if let CoreState::Scheduled(scheduled) = core {
let core_index = CoreIndex(idx as _);
let group_index = group_rotation_info.group_for_core(core_index, n_cores);
if let Some(g) = validator_groups.get(group_index.0 as usize) {
if g.contains(&validator.index()) {
assignment = Some((scheduled.para_id, scheduled.collator));
}
asynchronous rob
committed
groups.insert(scheduled.para_id, g.clone());
}
let table_context = TableContext {
groups,
validators,
signing_context: validator.signing_context().clone(),
validator: Some(validator),
};
let (assignment, required_collator) = match assignment {
None => return Ok(()), // no need to work.
Some((a, r)) => (a, r),
};
let job = CandidateBackingJob {
parent,
rx_to,
tx_from,
assignment,
required_collator,
issued_statements: HashSet::new(),
seconded: None,
reported_misbehavior_for: HashSet::new(),
keystore,
table: Table::default(),
table_context,
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
#[derive(Clone)]
struct MetricsInner {
signed_statements_total: prometheus::Counter<prometheus::U64>,
candidates_seconded_total: prometheus::Counter<prometheus::U64>
}
/// Candidate backing metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_statement_signed(&self) {
if let Some(metrics) = &self.0 {
metrics.signed_statements_total.inc();
}
}
fn on_candidate_seconded(&self) {
if let Some(metrics) = &self.0 {
metrics.candidates_seconded_total.inc();
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
signed_statements_total: prometheus::register(
prometheus::Counter::new(
"parachain_signed_statements_total",
"Number of statements signed.",
)?,
registry,
)?,
candidates_seconded_total: prometheus::register(
prometheus::Counter::new(
"parachain_candidates_seconded_total",
"Number of candidates seconded.",
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
delegated_subsystem!(CandidateBackingJob(SyncCryptoStorePtr, Metrics) <- ToJob as CandidateBackingSubsystem);
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use futures::{future, Future};
ScheduledCore, BlockData, CandidateCommitments,
PersistedValidationData, ValidationData, TransientValidationData, HeadData,
ValidityAttestation, GroupRotationInfo,
asynchronous rob
committed
messages::RuntimeApiRequest,
ActiveLeavesUpdate, FromOverseer, OverseerSignal,
use polkadot_node_primitives::InvalidCandidate;
use sp_application_crypto::AppKey;
use sp_keystore::{CryptoStore, SyncCryptoStore};
fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec<ValidatorId> {
val_ids.iter().map(|v| v.public().into()).collect()
}
struct TestState {
chain_ids: Vec<ParaId>,
keystore: SyncCryptoStorePtr,
validators: Vec<Sr25519Keyring>,
validator_public: Vec<ValidatorId>,
validation_data: ValidationData,
asynchronous rob
committed
validator_groups: (Vec<Vec<ValidatorIndex>>, GroupRotationInfo),
availability_cores: Vec<CoreState>,
head_data: HashMap<ParaId, HeadData>,
signing_context: SigningContext,
relay_parent: Hash,
}
impl Default for TestState {
fn default() -> Self {
let chain_a = ParaId::from(1);
let chain_b = ParaId::from(2);
let thread_a = ParaId::from(3);
let chain_ids = vec![chain_a, chain_b, thread_a];
let validators = vec![
Sr25519Keyring::Alice,
Sr25519Keyring::Bob,
Sr25519Keyring::Charlie,
Sr25519Keyring::Dave,
Sr25519Keyring::Ferdie,
];
let keystore = Arc::new(sc_keystore::LocalKeystore::in_memory());
// Make sure `Alice` key is in the keystore, so this mocked node will be a parachain validator.
SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, Some(&validators[0].to_seed()))
.expect("Insert key into keystore");
let validator_public = validator_pubkeys(&validators);
asynchronous rob
committed
let validator_groups = vec![vec![2, 0, 3], vec![1], vec![4]];
let group_rotation_info = GroupRotationInfo {
session_start_block: 0,
group_rotation_frequency: 100,
now: 1,
};
let thread_collator: CollatorId = Sr25519Keyring::Two.public().into();
asynchronous rob
committed
let availability_cores = vec![
CoreState::Scheduled(ScheduledCore {
para_id: chain_a,
collator: None,
}),
CoreState::Scheduled(ScheduledCore {
para_id: chain_b,
collator: None,
}),
CoreState::Scheduled(ScheduledCore {
para_id: thread_a,
collator: Some(thread_collator.clone()),
}),
];
asynchronous rob
committed
let mut head_data = HashMap::new();
head_data.insert(chain_a, HeadData(vec![4, 5, 6]));
asynchronous rob
committed
let relay_parent = Hash::from([5; 32]);
let signing_context = SigningContext {
session_index: 1,
asynchronous rob
committed
parent_hash: relay_parent,
let validation_data = ValidationData {
persisted: PersistedValidationData {
parent_head: HeadData(vec![7, 8, 9]),
block_number: Default::default(),
hrmp_mqc_heads: Vec::new(),
dmq_mqc_head: Default::default(),
},
transient: TransientValidationData {
max_code_size: 1000,
max_head_data_size: 1000,
balance: Default::default(),
code_upgrade_allowed: None,
};
Self {
chain_ids,
keystore,
validators,
validator_public,
asynchronous rob
committed
validator_groups: (validator_groups, group_rotation_info),
availability_cores,