// Copyright 2020 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! Implements a `CandidateBackingSubsystem`. #![recursion_limit="256"] use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::pin::Pin; use std::time::Duration; use bitvec::vec::BitVec; use log; use futures::{ select, FutureExt, SinkExt, StreamExt, channel::{oneshot, mpsc}, future::{self, Either}, task::{Spawn, SpawnError, SpawnExt}, }; use futures_timer::Delay; use streamunordered::{StreamUnordered, StreamYield}; use primitives::Pair; use keystore::KeyStorePtr; use polkadot_primitives::{ Hash, parachain::{ AbridgedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorPair, ValidatorId, ValidatorIndex, HeadData, SigningContext, PoVBlock, OmittedValidationData, CandidateDescriptor, LocalValidationData, GlobalValidationSchedule, AvailableData, ErasureChunk, }, }; use polkadot_node_primitives::{ FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult, }; use polkadot_subsystem::{ FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, }; use polkadot_subsystem::messages::{ AllMessages, CandidateBackingMessage, CandidateSelectionMessage, SchedulerRoster, RuntimeApiMessage, RuntimeApiRequest, CandidateValidationMessage, ValidationFailed, StatementDistributionMessage, NewBackedCandidate, ProvisionerMessage, ProvisionableData, PoVDistributionMessage, AvailabilityStoreMessage, }; use statement_table::{ generic::AttestedCandidate as TableAttestedCandidate, Table, Context as TableContextTrait, Statement as TableStatement, SignedStatement as TableSignedStatement, Summary as TableSummary, }; #[derive(Debug, derive_more::From)] enum Error { NotInValidatorSet, CandidateNotFound, JobNotFound(Hash), InvalidSignature, #[from] Erasure(erasure_coding::Error), #[from] ValidationFailed(ValidationFailed), #[from] Oneshot(oneshot::Canceled), #[from] Mpsc(mpsc::SendError), #[from] Spawn(SpawnError), } /// Holds all data needed for candidate backing job operation. struct CandidateBackingJob { /// The hash of the relay parent on top of which this job is doing it's work. parent: Hash, /// Inbound message channel receiving part. rx_to: mpsc::Receiver, /// Outbound message channel sending part. tx_from: mpsc::Sender, /// `HeadData`s of the parachains that this validator is assigned to. head_data: HeadData, /// The `ParaId`s assigned to this validator. assignment: ParaId, /// We issued `Valid` or `Invalid` statements on about these candidates. issued_statements: HashSet, /// `Some(h)` if this job has already issues `Seconded` statemt for some candidate with `h` hash. seconded: Option, /// We have already reported misbehaviors for these validators. reported_misbehavior_for: HashSet, table: Table, table_context: TableContext, } const fn group_quorum(n_validators: usize) -> usize { (n_validators / 2) + 1 } #[derive(Default)] struct TableContext { signing_context: SigningContext, key: Option, groups: HashMap>, validators: Vec, } impl TableContextTrait for TableContext { fn is_member_of(&self, authority: ValidatorIndex, group: &ParaId) -> bool { self.groups.get(group).map_or(false, |g| g.iter().position(|&a| a == authority).is_some()) } fn requisite_votes(&self, group: &ParaId) -> usize { self.groups.get(group).map_or(usize::max_value(), |g| group_quorum(g.len())) } } impl TableContext { fn local_id(&self) -> Option { self.key.as_ref().map(|k| k.public()) } fn local_index(&self) -> Option { self.local_id().and_then(|id| self.validators .iter() .enumerate() .find(|(_, k)| k == &&id) .map(|(i, _)| i as ValidatorIndex) ) } } const CHANNEL_CAPACITY: usize = 64; /// A message type that is sent from `CandidateBackingSubsystem` to `CandidateBackingJob`. enum ToJob { /// A `CandidateBackingMessage`. CandidateBacking(CandidateBackingMessage), /// Stop working. Stop, } /// A message type that is sent from `CandidateBackingJob` to `CandidateBackingSubsystem`. enum FromJob { AvailabilityStore(AvailabilityStoreMessage), RuntimeApiMessage(RuntimeApiMessage), CandidateValidation(CandidateValidationMessage), CandidateSelection(CandidateSelectionMessage), Provisioner(ProvisionerMessage), PoVDistribution(PoVDistributionMessage), StatementDistribution(StatementDistributionMessage), } impl From for AllMessages { fn from(f: FromJob) -> Self { match f { FromJob::AvailabilityStore(msg) => AllMessages::AvailabilityStore(msg), FromJob::RuntimeApiMessage(msg) => AllMessages::RuntimeApi(msg), FromJob::CandidateValidation(msg) => AllMessages::CandidateValidation(msg), FromJob::CandidateSelection(msg) => AllMessages::CandidateSelection(msg), FromJob::StatementDistribution(msg) => AllMessages::StatementDistribution(msg), FromJob::PoVDistribution(msg) => AllMessages::PoVDistribution(msg), FromJob::Provisioner(msg) => AllMessages::Provisioner(msg), } } } // It looks like it's not possible to do an `impl From` given the current state of // the code. So this does the necessary conversion. fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement { let statement = match s.payload() { Statement::Seconded(c) => TableStatement::Candidate(c.clone()), Statement::Valid(h) => TableStatement::Valid(h.clone()), Statement::Invalid(h) => TableStatement::Invalid(h.clone()), }; TableSignedStatement { statement, signature: s.signature().clone(), sender: s.validator_index(), } } // finds the first key we are capable of signing with out of the given set of validators, // if any. fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option { let keystore = keystore.read(); validators.iter() .find_map(|v| { keystore.key_pair::(&v).ok() }) } impl CandidateBackingJob { /// Run asynchronously. async fn run(mut self) -> Result<(), Error> { while let Some(msg) = self.rx_to.next().await { match msg { ToJob::CandidateBacking(msg) => { self.process_msg(msg).await?; } _ => break, } } Ok(()) } async fn issue_candidate_invalid_message( &mut self, candidate: AbridgedCandidateReceipt, ) -> Result<(), Error> { self.tx_from.send(FromJob::CandidateSelection( CandidateSelectionMessage::Invalid(self.parent, candidate) )).await?; Ok(()) } /// Validate the candidate that is requested to be `Second`ed and distribute validation result. async fn validate_and_second( &mut self, candidate: AbridgedCandidateReceipt, pov: PoVBlock, ) -> Result { let valid = self.request_candidate_validation(candidate.clone(), pov.clone()).await?; let statement = match valid.0 { ValidationResult::Valid => { // make PoV available for later distribution. Send data to the availability // store to keep. Sign and dispatch `valid` statement to network if we // have not seconded the given candidate. self.make_pov_available(pov, valid.1, valid.2).await?; self.issued_statements.insert(candidate.hash()); Statement::Seconded(candidate) } ValidationResult::Invalid => { let candidate_hash = candidate.hash(); self.issue_candidate_invalid_message(candidate).await?; Statement::Invalid(candidate_hash) } }; if let Some(signed_statement) = self.sign_statement(statement) { self.import_statement(&signed_statement).await?; self.distribute_signed_statement(signed_statement).await?; } Ok(valid.0) } fn get_backed(&self) -> Vec { let proposed = self.table.proposed_candidates(&self.table_context); let mut res = Vec::with_capacity(proposed.len()); for p in proposed.into_iter() { let TableAttestedCandidate { candidate, validity_votes, .. } = p; let (ids, validity_votes): (Vec<_>, Vec<_>) = validity_votes .into_iter() .map(|(id, vote)| (id, vote.into())) .unzip(); let group = match self.table_context.groups.get(&self.assignment) { Some(group) => group, None => continue, }; let mut validator_indices = BitVec::with_capacity( group.len() ); validator_indices.resize(group.len(), false); for id in ids.iter() { if let Some(position) = group.iter().position(|x| x == id) { validator_indices.set(position, true); } } let backed = BackedCandidate { candidate, validity_votes, validator_indices, }; res.push(NewBackedCandidate(backed.clone())); } res } /// Check if there have happened any new misbehaviors and issue necessary messages. /// /// TODO: Report multiple misbehaviors (https://github.com/paritytech/polkadot/issues/1387) async fn issue_new_misbehaviors(&mut self) -> Result<(), Error> { let mut reports = Vec::new(); for (k, v) in self.table.get_misbehavior().iter() { if !self.reported_misbehavior_for.contains(k) { self.reported_misbehavior_for.insert(*k); let f = FromTableMisbehavior { id: *k, report: v.clone(), signing_context: self.table_context.signing_context.clone(), key: self.table_context.validators[*k as usize].clone(), }; if let Ok(report) = MisbehaviorReport::try_from(f) { let message = ProvisionerMessage::ProvisionableData( ProvisionableData::MisbehaviorReport(self.parent, report) ); reports.push(message); } } } for report in reports.drain(..) { self.send_to_provisioner(report).await? } Ok(()) } /// Import a statement into the statement table and return the summary of the import. async fn import_statement( &mut self, statement: &SignedFullStatement, ) -> Result, Error> { let stmt = primitive_statement_to_table(statement); let summary = self.table.import_statement(&self.table_context, stmt); self.issue_new_misbehaviors().await?; return Ok(summary); } async fn process_msg(&mut self, msg: CandidateBackingMessage) -> Result<(), Error> { match msg { CandidateBackingMessage::Second(_, candidate, pov) => { // Sanity check that candidate is from our assignment. if candidate.parachain_index != self.assignment { return Ok(()); } // If the message is a `CandidateBackingMessage::Second`, sign and dispatch a // Seconded statement only if we have not seconded any other candidate and // have not signed a Valid statement for the requested candidate. match self.seconded { // This job has not seconded a candidate yet. None => { let candidate_hash = candidate.hash(); if !self.issued_statements.contains(&candidate_hash) { if let Ok(ValidationResult::Valid) = self.validate_and_second( candidate, pov, ).await { self.seconded = Some(candidate_hash); } } } // This job has already seconded a candidate. Some(_) => {} } } CandidateBackingMessage::Statement(_, statement) => { self.check_statement_signature(&statement)?; self.maybe_validate_and_import(statement).await?; } CandidateBackingMessage::GetBackedCandidates(_, tx) => { let backed = self.get_backed(); tx.send(backed).map_err(|_| oneshot::Canceled)?; } } Ok(()) } /// Kick off validation work and distribute the result as a signed statement. async fn kick_off_validation_work( &mut self, summary: TableSummary, ) -> Result { let candidate = self.table.get_candidate(&summary.candidate).ok_or(Error::CandidateNotFound)?; let candidate = candidate.clone(); let descriptor = candidate.to_descriptor(); let candidate_hash = candidate.hash(); let pov = self.request_pov_from_distribution(descriptor).await?; let v = self.request_candidate_validation(candidate, pov).await?; let statement = match v.0 { ValidationResult::Valid => { Statement::Valid(candidate_hash) } ValidationResult::Invalid => { Statement::Invalid(candidate_hash) } }; self.issued_statements.insert(candidate_hash); if let Some(signed_statement) = self.sign_statement(statement) { self.distribute_signed_statement(signed_statement).await?; } Ok(v.0) } /// Import the statement and kick off validation work if it is a part of our assignment. async fn maybe_validate_and_import( &mut self, statement: SignedFullStatement, ) -> Result<(), Error> { if let Some(summary) = self.import_statement(&statement).await? { if let Statement::Seconded(_) = statement.payload() { if summary.group_id == self.assignment { self.kick_off_validation_work(summary).await?; } } } Ok(()) } fn sign_statement(&self, statement: Statement) -> Option { let local_index = self.table_context.local_index()?; let signing_key = self.table_context.key.as_ref()?; let signed_statement = SignedFullStatement::sign( statement, &self.table_context.signing_context, local_index, signing_key, ); Some(signed_statement) } fn check_statement_signature(&self, statement: &SignedFullStatement) -> Result<(), Error> { let idx = statement.validator_index() as usize; if self.table_context.validators.len() > idx { statement.check_signature( &self.table_context.signing_context, &self.table_context.validators[idx], ).map_err(|_| Error::InvalidSignature)?; } else { return Err(Error::InvalidSignature); } Ok(()) } async fn send_to_provisioner(&mut self, msg: ProvisionerMessage) -> Result<(), Error> { self.tx_from.send(FromJob::Provisioner(msg)).await?; Ok(()) } async fn request_pov_from_distribution( &mut self, descriptor: CandidateDescriptor, ) -> Result { let (tx, rx) = oneshot::channel(); self.tx_from.send(FromJob::PoVDistribution( PoVDistributionMessage::FetchPoV(self.parent, descriptor, tx) )).await?; let pov = rx.await?; Ok((*pov).clone()) } async fn request_candidate_validation( &mut self, candidate: AbridgedCandidateReceipt, pov: PoVBlock, ) -> Result<(ValidationResult, GlobalValidationSchedule, LocalValidationData), Error> { let (tx, rx) = oneshot::channel(); self.tx_from.send(FromJob::CandidateValidation( CandidateValidationMessage::Validate( self.parent, candidate, self.head_data.clone(), pov, tx, ) ) ).await?; Ok(rx.await??) } async fn store_chunk( &mut self, id: ValidatorIndex, chunk: ErasureChunk, ) -> Result<(), Error> { self.tx_from.send(FromJob::AvailabilityStore( AvailabilityStoreMessage::StoreChunk(self.parent, id, chunk) ) ).await?; Ok(()) } async fn make_pov_available( &mut self, pov_block: PoVBlock, global_validation: GlobalValidationSchedule, local_validation: LocalValidationData, ) -> Result<(), Error> { let omitted_validation = OmittedValidationData { global_validation, local_validation, }; let available_data = AvailableData { pov_block, omitted_validation, }; let chunks = erasure_coding::obtain_chunks( self.table_context.validators.len(), &available_data, )?; let branches = erasure_coding::branches(chunks.as_ref()); for (index, (chunk, proof)) in chunks.iter().zip(branches.map(|(proof, _)| proof)).enumerate() { let chunk = ErasureChunk { chunk: chunk.clone(), index: index as u32, proof, }; self.store_chunk(index as ValidatorIndex, chunk).await?; } Ok(()) } async fn distribute_signed_statement(&mut self, s: SignedFullStatement) -> Result<(), Error> { let smsg = StatementDistributionMessage::Share(self.parent, s); self.tx_from.send(FromJob::StatementDistribution(smsg)).await?; Ok(()) } } struct JobHandle { abort_handle: future::AbortHandle, to_job: mpsc::Sender, finished: oneshot::Receiver<()>, su_handle: usize, } impl JobHandle { async fn stop(mut self) { let _ = self.to_job.send(ToJob::Stop).await; let stop_timer = Delay::new(Duration::from_secs(1)); match future::select(stop_timer, self.finished).await { Either::Left((_, _)) => { }, Either::Right((_, _)) => { self.abort_handle.abort(); }, } } async fn send_msg(&mut self, msg: ToJob) -> Result<(), Error> { Ok(self.to_job.send(msg).await?) } } struct Jobs { spawner: S, running: HashMap, outgoing_msgs: StreamUnordered>, } async fn run_job( parent: Hash, keystore: KeyStorePtr, rx_to: mpsc::Receiver, mut tx_from: mpsc::Sender, ) -> Result<(), Error> { let (validators, roster) = futures::try_join!( request_validators(parent, &mut tx_from).await?, request_validator_groups(parent, &mut tx_from).await?, )?; let key = signing_key(&validators[..], &keystore).ok_or(Error::NotInValidatorSet)?; let mut groups = HashMap::new(); for assignment in roster.scheduled { if let Some(g) = roster.validator_groups.get(assignment.group_idx.0 as usize) { groups.insert( assignment.para_id, g.clone(), ); } } let mut assignment = Default::default(); if let Some(idx) = validators.iter().position(|k| *k == key.public()) { let idx = idx as u32; for (para_id, group) in groups.iter() { if group.contains(&idx) { assignment = *para_id; break; } } } let ( head_data, signing_context, ) = futures::try_join!( request_head_data(parent, &mut tx_from, assignment).await?, request_signing_context(parent, &mut tx_from).await?, )?; let table_context = TableContext { signing_context, key: Some(key), groups, validators, }; let job = CandidateBackingJob { parent, rx_to, tx_from, head_data, assignment, issued_statements: HashSet::new(), seconded: None, reported_misbehavior_for: HashSet::new(), table: Table::default(), table_context, }; job.run().await } /// Request a validator set from the `RuntimeApi`. async fn request_validators( parent: Hash, s: &mut mpsc::Sender, ) -> Result>, Error> { let (tx, rx) = oneshot::channel(); s.send(FromJob::RuntimeApiMessage(RuntimeApiMessage::Request( parent, RuntimeApiRequest::Validators(tx), ) )).await?; Ok(rx) } /// Request the scheduler roster from `RuntimeApi`. async fn request_validator_groups( parent: Hash, s: &mut mpsc::Sender, ) -> Result, Error> { let (tx, rx) = oneshot::channel(); s.send(FromJob::RuntimeApiMessage(RuntimeApiMessage::Request( parent, RuntimeApiRequest::ValidatorGroups(tx), ) )).await?; Ok(rx) } /// Request a `SigningContext` from the `RuntimeApi`. async fn request_signing_context( parent: Hash, s: &mut mpsc::Sender, ) -> Result, Error> { let (tx, rx) = oneshot::channel(); s.send(FromJob::RuntimeApiMessage(RuntimeApiMessage::Request( parent, RuntimeApiRequest::SigningContext(tx), ) )).await?; Ok(rx) } /// Request `HeadData` for some `ParaId` from `RuntimeApi`. async fn request_head_data( parent: Hash, s: &mut mpsc::Sender, id: ParaId, ) -> Result, Error> { let (tx, rx) = oneshot::channel(); s.send(FromJob::RuntimeApiMessage(RuntimeApiMessage::Request( parent, RuntimeApiRequest::HeadData(id, tx), ) )).await?; Ok(rx) } impl Jobs { fn new(spawner: S) -> Self { Self { spawner, running: HashMap::default(), outgoing_msgs: StreamUnordered::new(), } } fn spawn_job(&mut self, parent_hash: Hash, keystore: KeyStorePtr) -> Result<(), Error> { let (to_job_tx, to_job_rx) = mpsc::channel(CHANNEL_CAPACITY); let (from_job_tx, from_job_rx) = mpsc::channel(CHANNEL_CAPACITY); let (future, abort_handle) = future::abortable(async move { if let Err(e) = run_job(parent_hash, keystore, to_job_rx, from_job_tx).await { log::error!( "CandidateBackingJob({}) finished with an error {:?}", parent_hash, e, ); } }); let (finished_tx, finished) = oneshot::channel(); let future = async move { let _ = future.await; let _ = finished_tx.send(()); }; self.spawner.spawn(future)?; let su_handle = self.outgoing_msgs.push(from_job_rx); let handle = JobHandle { abort_handle, to_job: to_job_tx, finished, su_handle, }; self.running.insert(parent_hash, handle); Ok(()) } async fn stop_job(&mut self, parent_hash: Hash) -> Result<(), Error> { match self.running.remove(&parent_hash) { Some(handle) => { Pin::new(&mut self.outgoing_msgs).remove(handle.su_handle); handle.stop().await; Ok(()) } None => Err(Error::JobNotFound(parent_hash)) } } async fn send_msg(&mut self, parent_hash: Hash, msg: ToJob) -> Result<(), Error> { if let Some(job) = self.running.get_mut(&parent_hash) { job.send_msg(msg).await?; } Ok(()) } async fn next(&mut self) -> Option { self.outgoing_msgs.next().await.and_then(|(e, _)| match e { StreamYield::Item(e) => Some(e), _ => None, }) } } /// An implementation of the Candidate Backing subsystem. pub struct CandidateBackingSubsystem { spawner: S, keystore: KeyStorePtr, _context: std::marker::PhantomData, } impl CandidateBackingSubsystem where S: Spawn + Clone, Context: SubsystemContext, { /// Creates a new `CandidateBackingSubsystem`. pub fn new(keystore: KeyStorePtr, spawner: S) -> Self { Self { spawner, keystore, _context: std::marker::PhantomData, } } async fn run( mut ctx: Context, keystore: KeyStorePtr, spawner: S, ) { let mut jobs = Jobs::new(spawner.clone()); loop { select! { incoming = ctx.recv().fuse() => { match incoming { Ok(msg) => match msg { FromOverseer::Signal(OverseerSignal::StartWork(hash)) => { if let Err(e) = jobs.spawn_job(hash, keystore.clone()) { log::error!("Failed to spawn a job: {:?}", e); break; } } FromOverseer::Signal(OverseerSignal::StopWork(hash)) => { if let Err(e) = jobs.stop_job(hash).await { log::error!("Failed to spawn a job: {:?}", e); break; } } FromOverseer::Communication { msg } => { match msg { CandidateBackingMessage::Second(hash, _, _) | CandidateBackingMessage::Statement(hash, _) | CandidateBackingMessage::GetBackedCandidates(hash, _) => { let res = jobs.send_msg( hash.clone(), ToJob::CandidateBacking(msg), ).await; if let Err(e) = res { log::error!( "Failed to send a message to a job: {:?}", e, ); break; } } _ => (), } } _ => (), }, Err(_) => break, } } outgoing = jobs.next().fuse() => { match outgoing { Some(msg) => { let _ = ctx.send_message(msg.into()).await; } None => break, } } complete => break, } } } } impl Subsystem for CandidateBackingSubsystem where S: Spawn + Send + Clone + 'static, Context: SubsystemContext, { fn start(self, ctx: Context) -> SpawnedSubsystem { let keystore = self.keystore.clone(); let spawner = self.spawner.clone(); SpawnedSubsystem(Box::pin(async move { Self::run(ctx, keystore, spawner).await; })) } } #[cfg(test)] mod tests { use super::*; use futures::{Future, executor::{self, ThreadPool}}; use std::collections::HashMap; use std::sync::Arc; use sp_keyring::Sr25519Keyring; use polkadot_primitives::parachain::{ AssignmentKind, CollatorId, CoreAssignment, BlockData, CoreIndex, GroupIndex, ValidityAttestation, }; use assert_matches::assert_matches; fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec { val_ids.iter().map(|v| v.public().into()).collect() } struct TestState { chain_ids: Vec, keystore: KeyStorePtr, validators: Vec, validator_public: Vec, global_validation_schedule: GlobalValidationSchedule, local_validation_data: LocalValidationData, roster: SchedulerRoster, head_data: HashMap, signing_context: SigningContext, relay_parent: Hash, } impl Default for TestState { fn default() -> Self { let chain_a = ParaId::from(1); let chain_b = ParaId::from(2); let thread_a = ParaId::from(3); let chain_ids = vec![chain_a, chain_b, thread_a]; let validators = vec![ Sr25519Keyring::Alice, Sr25519Keyring::Bob, Sr25519Keyring::Charlie, Sr25519Keyring::Dave, Sr25519Keyring::Ferdie, ]; let keystore = keystore::Store::new_in_memory(); // Make sure `Alice` key is in the keystore, so this mocked node will be a parachain validator. keystore.write().insert_ephemeral_from_seed::(&validators[0].to_seed()) .expect("Insert key into keystore"); let validator_public = validator_pubkeys(&validators); let chain_a_assignment = CoreAssignment { core: CoreIndex::from(0), para_id: chain_a, kind: AssignmentKind::Parachain, group_idx: GroupIndex::from(0), }; let chain_b_assignment = CoreAssignment { core: CoreIndex::from(1), para_id: chain_b, kind: AssignmentKind::Parachain, group_idx: GroupIndex::from(1), }; let thread_collator: CollatorId = Sr25519Keyring::Two.public().into(); let thread_a_assignment = CoreAssignment { core: CoreIndex::from(2), para_id: thread_a, kind: AssignmentKind::Parathread(thread_collator.clone(), 0), group_idx: GroupIndex::from(2), }; let validator_groups = vec![vec![2, 0, 3], vec![1], vec![4]]; let parent_hash_1 = [1; 32].into(); let roster = SchedulerRoster { validator_groups, scheduled: vec![ chain_a_assignment, chain_b_assignment, thread_a_assignment, ], upcoming: vec![], availability_cores: vec![], }; let signing_context = SigningContext { session_index: 1, parent_hash: parent_hash_1, }; let mut head_data = HashMap::new(); head_data.insert(chain_a, HeadData(vec![4, 5, 6])); let relay_parent = Hash::from([5; 32]); let local_validation_data = LocalValidationData { parent_head: HeadData(vec![7, 8, 9]), balance: Default::default(), code_upgrade_allowed: None, }; let global_validation_schedule = GlobalValidationSchedule { max_code_size: 1000, max_head_data_size: 1000, block_number: Default::default(), }; Self { chain_ids, keystore, validators, validator_public, roster, head_data, local_validation_data, global_validation_schedule, signing_context, relay_parent, } } } struct TestHarness { virtual_overseer: subsystem_test::TestSubsystemContextHandle, } fn test_harness>(keystore: KeyStorePtr, test: impl FnOnce(TestHarness) -> T) { let pool = ThreadPool::new().unwrap(); let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool.clone()); let subsystem = CandidateBackingSubsystem::run(context, keystore, pool.clone()); let test_fut = test(TestHarness { virtual_overseer, }); futures::pin_mut!(test_fut); futures::pin_mut!(subsystem); executor::block_on(future::select(test_fut, subsystem)); } // Tests that the subsystem performs actions that are requied on startup. async fn test_startup( virtual_overseer: &mut subsystem_test::TestSubsystemContextHandle, test_state: &TestState, ) { // Start work on some new parent. virtual_overseer.send(FromOverseer::Signal( OverseerSignal::StartWork(test_state.relay_parent)) ).await; // Check that subsystem job issues a request for a validator set. assert_matches!( virtual_overseer.recv().await, AllMessages::RuntimeApi( RuntimeApiMessage::Request(parent, RuntimeApiRequest::Validators(tx)) ) if parent == test_state.relay_parent => { tx.send(test_state.validator_public.clone()).unwrap(); } ); // Check that subsystem job issues a request for the validator groups. assert_matches!( virtual_overseer.recv().await, AllMessages::RuntimeApi( RuntimeApiMessage::Request(parent, RuntimeApiRequest::ValidatorGroups(tx)) ) if parent == test_state.relay_parent => { tx.send(test_state.roster.clone()).unwrap(); } ); // Check that subsystem job issues a request for the head data. assert_matches!( virtual_overseer.recv().await, AllMessages::RuntimeApi( RuntimeApiMessage::Request(parent, RuntimeApiRequest::HeadData(id, tx)) ) if parent == test_state.relay_parent => { tx.send(test_state.head_data.get(&id).unwrap().clone()).unwrap(); } ); // Check that subsystem job issues a request for the signing context. assert_matches!( virtual_overseer.recv().await, AllMessages::RuntimeApi( RuntimeApiMessage::Request(parent, RuntimeApiRequest::SigningContext(tx)) ) if parent == test_state.relay_parent => { tx.send(test_state.signing_context.clone()).unwrap(); } ); } // Test that a `CandidateBackingMessage::Second` issues validation work // and in case validation is successful issues a `StatementDistributionMessage`. #[test] fn backing_second_works() { let test_state = TestState::default(); test_harness(test_state.keystore.clone(), |test_harness| async move { let TestHarness { mut virtual_overseer } = test_harness; test_startup(&mut virtual_overseer, &test_state).await; let pov_block = PoVBlock { block_data: BlockData(vec![42, 43, 44]), }; let pov_block_hash = pov_block.hash(); let candidate = AbridgedCandidateReceipt { parachain_index: test_state.chain_ids[0], relay_parent: test_state.relay_parent, pov_block_hash, ..Default::default() }; let second = CandidateBackingMessage::Second( test_state.relay_parent, candidate.clone(), pov_block.clone(), ); virtual_overseer.send(FromOverseer::Communication{ msg: second }).await; let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap(); assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( CandidateValidationMessage::Validate( parent_hash, c, head_data, pov, tx, ) ) if parent_hash == test_state.relay_parent && pov == pov_block && c == candidate => { assert_eq!(head_data, *expected_head_data); tx.send(Ok(( ValidationResult::Valid, test_state.global_validation_schedule, test_state.local_validation_data, ))).unwrap(); } ); for _ in 0..test_state.validators.len() { assert_matches!( virtual_overseer.recv().await, AllMessages::AvailabilityStore( AvailabilityStoreMessage::StoreChunk(parent_hash, _, _) ) if parent_hash == test_state.relay_parent ); } assert_matches!( virtual_overseer.recv().await, AllMessages::StatementDistribution( StatementDistributionMessage::Share( parent_hash, signed_statement, ) ) if parent_hash == test_state.relay_parent => { signed_statement.check_signature( &test_state.signing_context, &test_state.validator_public[0], ).unwrap(); } ); virtual_overseer.send(FromOverseer::Signal( OverseerSignal::StopWork(test_state.relay_parent)) ).await; }); } // Test that the candidate reaches quorum succesfully. #[test] fn backing_works() { let test_state = TestState::default(); test_harness(test_state.keystore.clone(), |test_harness| async move { let TestHarness { mut virtual_overseer } = test_harness; test_startup(&mut virtual_overseer, &test_state).await; let pov_block = PoVBlock { block_data: BlockData(vec![1, 2, 3]), }; let pov_block_hash = pov_block.hash(); let candidate_a = AbridgedCandidateReceipt { parachain_index: test_state.chain_ids[0], relay_parent: test_state.relay_parent, pov_block_hash, ..Default::default() }; let candidate_a_hash = candidate_a.hash(); let signed_a = SignedFullStatement::sign( Statement::Seconded(candidate_a.clone()), &test_state.signing_context, 2, &test_state.validators[2].pair().into(), ); let signed_b = SignedFullStatement::sign( Statement::Valid(candidate_a_hash), &test_state.signing_context, 0, &test_state.validators[0].pair().into(), ); let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_a.clone()); virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; // Sending a `Statement::Seconded` for our assignment will start // validation process. The first thing requested is PoVBlock from the // `PoVDistribution`. assert_matches!( virtual_overseer.recv().await, AllMessages::PoVDistribution( PoVDistributionMessage::FetchPoV(relay_parent, _, tx) ) if relay_parent == test_state.relay_parent => { tx.send(Arc::new(pov_block.clone())).unwrap(); } ); let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap(); // The next step is the actual request to Validation subsystem // to validate the `Seconded` candidate. assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( CandidateValidationMessage::Validate( relay_parent, candidate, head_data, pov, tx, ) ) if relay_parent == test_state.relay_parent && candidate == candidate_a => { assert_eq!(head_data, *expected_head_data); assert_eq!(pov, pov_block); tx.send(Ok(( ValidationResult::Valid, test_state.global_validation_schedule, test_state.local_validation_data, ))).unwrap(); } ); let statement = CandidateBackingMessage::Statement( test_state.relay_parent, signed_b.clone(), ); virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; let (tx, rx) = oneshot::channel(); // The backed candidats set should be not empty at this point. virtual_overseer.send(FromOverseer::Communication{ msg: CandidateBackingMessage::GetBackedCandidates( test_state.relay_parent, tx, ) }).await; let backed = rx.await.unwrap(); // `validity_votes` may be in any order so we can't do this in a single assert. assert_eq!(backed[0].0.candidate, candidate_a); assert_eq!(backed[0].0.validity_votes.len(), 2); assert!(backed[0].0.validity_votes.contains( &ValidityAttestation::Explicit(signed_b.signature().clone()) )); assert!(backed[0].0.validity_votes.contains( &ValidityAttestation::Implicit(signed_a.signature().clone()) )); assert_eq!(backed[0].0.validator_indices, bitvec::bitvec![Lsb0, u8; 1, 1, 0]); virtual_overseer.send(FromOverseer::Signal( OverseerSignal::StopWork(test_state.relay_parent)) ).await; }); } // Issuing conflicting statements on the same candidate should // be a misbehavior. #[test] fn backing_misbehavior_works() { let test_state = TestState::default(); test_harness(test_state.keystore.clone(), |test_harness| async move { let TestHarness { mut virtual_overseer } = test_harness; test_startup(&mut virtual_overseer, &test_state).await; let pov_block = PoVBlock { block_data: BlockData(vec![1, 2, 3]), }; let pov_block_hash = pov_block.hash(); let candidate_a = AbridgedCandidateReceipt { parachain_index: test_state.chain_ids[0], relay_parent: test_state.relay_parent, pov_block_hash, ..Default::default() }; let candidate_a_hash = candidate_a.hash(); let signed_a = SignedFullStatement::sign( Statement::Seconded(candidate_a.clone()), &test_state.signing_context, 2, &test_state.validators[2].pair().into(), ); let signed_b = SignedFullStatement::sign( Statement::Valid(candidate_a_hash), &test_state.signing_context, 0, &test_state.validators[0].pair().into(), ); let signed_c = SignedFullStatement::sign( Statement::Invalid(candidate_a_hash), &test_state.signing_context, 0, &test_state.validators[0].pair().into(), ); let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_a.clone()); virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; assert_matches!( virtual_overseer.recv().await, AllMessages::PoVDistribution( PoVDistributionMessage::FetchPoV(relay_parent, _, tx) ) if relay_parent == test_state.relay_parent => { tx.send(Arc::new(pov_block.clone())).unwrap(); } ); let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap(); assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( CandidateValidationMessage::Validate( relay_parent, candidate, head_data, pov, tx, ) ) if relay_parent == test_state.relay_parent && candidate == candidate_a => { assert_eq!(pov, pov_block); assert_eq!(head_data, *expected_head_data); tx.send(Ok(( ValidationResult::Valid, test_state.global_validation_schedule, test_state.local_validation_data, ))).unwrap(); } ); assert_matches!( virtual_overseer.recv().await, AllMessages::StatementDistribution( StatementDistributionMessage::Share( relay_parent, signed_statement, ) ) if relay_parent == test_state.relay_parent => { signed_statement.check_signature( &test_state.signing_context, &test_state.validator_public[0], ).unwrap(); assert_eq!(*signed_statement.payload(), Statement::Valid(candidate_a_hash)); } ); let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_b.clone()); virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; let statement = CandidateBackingMessage::Statement(test_state.relay_parent, signed_c.clone()); virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; assert_matches!( virtual_overseer.recv().await, AllMessages::Provisioner( ProvisionerMessage::ProvisionableData( ProvisionableData::MisbehaviorReport( relay_parent, MisbehaviorReport::SelfContradiction(_, s1, s2), ) ) ) if relay_parent == test_state.relay_parent => { s1.check_signature( &test_state.signing_context, &test_state.validator_public[s1.validator_index() as usize], ).unwrap(); s2.check_signature( &test_state.signing_context, &test_state.validator_public[s2.validator_index() as usize], ).unwrap(); } ); }); } // Test that if we are asked to second an invalid candidate we // can still second a valid one afterwards. #[test] fn backing_dont_second_invalid() { let test_state = TestState::default(); test_harness(test_state.keystore.clone(), |test_harness| async move { let TestHarness { mut virtual_overseer } = test_harness; test_startup(&mut virtual_overseer, &test_state).await; let pov_block_a = PoVBlock { block_data: BlockData(vec![42, 43, 44]), }; let pov_block_b = PoVBlock { block_data: BlockData(vec![45, 46, 47]), }; let pov_block_hash_a = pov_block_a.hash(); let pov_block_hash_b = pov_block_b.hash(); let candidate_a = AbridgedCandidateReceipt { parachain_index: test_state.chain_ids[0], relay_parent: test_state.relay_parent, pov_block_hash: pov_block_hash_a, ..Default::default() }; let candidate_a_hash = candidate_a.hash(); let candidate_b = AbridgedCandidateReceipt { parachain_index: test_state.chain_ids[0], relay_parent: test_state.relay_parent, pov_block_hash: pov_block_hash_b, ..Default::default() }; let second = CandidateBackingMessage::Second( test_state.relay_parent, candidate_a.clone(), pov_block_a.clone(), ); virtual_overseer.send(FromOverseer::Communication{ msg: second }).await; let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap(); assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( CandidateValidationMessage::Validate( parent_hash, c, head_data, pov, tx, ) ) if parent_hash == test_state.relay_parent && pov == pov_block_a && c == candidate_a => { assert_eq!(head_data, *expected_head_data); tx.send(Ok(( ValidationResult::Invalid, test_state.global_validation_schedule.clone(), test_state.local_validation_data.clone(), ))).unwrap(); } ); assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateSelection( CandidateSelectionMessage::Invalid(parent_hash, candidate) ) if parent_hash == test_state.relay_parent && candidate == candidate_a ); assert_matches!( virtual_overseer.recv().await, AllMessages::StatementDistribution( StatementDistributionMessage::Share( relay_parent, statement, ) ) if relay_parent == test_state.relay_parent => { assert_eq!(*statement.payload(), Statement::Invalid(candidate_a_hash)); } ); let second = CandidateBackingMessage::Second( test_state.relay_parent, candidate_b.clone(), pov_block_b.clone(), ); virtual_overseer.send(FromOverseer::Communication{ msg: second }).await; let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap(); assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( CandidateValidationMessage::Validate( parent_hash, c, head_data, pov, tx, ) ) if parent_hash == test_state.relay_parent && pov == pov_block_b && c == candidate_b => { assert_eq!(head_data, *expected_head_data); tx.send(Ok(( ValidationResult::Valid, test_state.global_validation_schedule, test_state.local_validation_data, ))).unwrap(); } ); for _ in 0..test_state.validators.len() { assert_matches!( virtual_overseer.recv().await, AllMessages::AvailabilityStore( AvailabilityStoreMessage::StoreChunk(parent_hash, _, _) ) if parent_hash == test_state.relay_parent ); } assert_matches!( virtual_overseer.recv().await, AllMessages::StatementDistribution( StatementDistributionMessage::Share( parent_hash, signed_statement, ) ) if parent_hash == test_state.relay_parent => { signed_statement.check_signature( &test_state.signing_context, &test_state.validator_public[0], ).unwrap(); assert_eq!(*signed_statement.payload(), Statement::Seconded(candidate_b)); } ); virtual_overseer.send(FromOverseer::Signal( OverseerSignal::StopWork(test_state.relay_parent)) ).await; }); } // Test that if we have already issued a statement (in this case `Invalid`) about a // candidate we will not be issuing a `Seconded` statement on it. #[test] fn backing_multiple_statements_work() { let test_state = TestState::default(); test_harness(test_state.keystore.clone(), |test_harness| async move { let TestHarness { mut virtual_overseer } = test_harness; test_startup(&mut virtual_overseer, &test_state).await; let pov_block = PoVBlock { block_data: BlockData(vec![42, 43, 44]), }; let pov_block_hash = pov_block.hash(); let candidate = AbridgedCandidateReceipt { parachain_index: test_state.chain_ids[0], relay_parent: test_state.relay_parent, pov_block_hash, ..Default::default() }; let candidate_hash = candidate.hash(); let signed_a = SignedFullStatement::sign( Statement::Seconded(candidate.clone()), &test_state.signing_context, 2, &test_state.validators[2].pair().into(), ); // Send in a `Statement` with a candidate. let statement = CandidateBackingMessage::Statement( test_state.relay_parent, signed_a.clone(), ); virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await; // Subsystem requests PoV and requests validation. assert_matches!( virtual_overseer.recv().await, AllMessages::PoVDistribution( PoVDistributionMessage::FetchPoV(relay_parent, _, tx) ) => { assert_eq!(relay_parent, test_state.relay_parent); tx.send(Arc::new(pov_block.clone())).unwrap(); } ); let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap(); // Tell subsystem that this candidate is invalid. assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( CandidateValidationMessage::Validate( relay_parent, candidate_recvd, head_data, pov, tx, ) ) => { assert_eq!(relay_parent, test_state.relay_parent); assert_eq!(candidate_recvd, candidate); assert_eq!(head_data, *expected_head_data); assert_eq!(pov, pov_block); tx.send(Ok(( ValidationResult::Invalid, test_state.global_validation_schedule, test_state.local_validation_data, ))).unwrap(); } ); // The invalid message is shared. assert_matches!( virtual_overseer.recv().await, AllMessages::StatementDistribution( StatementDistributionMessage::Share( relay_parent, signed_statement, ) ) => { assert_eq!(relay_parent, test_state.relay_parent); signed_statement.check_signature( &test_state.signing_context, &test_state.validator_public[0], ).unwrap(); assert_eq!(*signed_statement.payload(), Statement::Invalid(candidate_hash)); } ); // Ask subsystem to `Second` a candidate that already has a statement issued about. // This should emit no actions from subsystem. let second = CandidateBackingMessage::Second( test_state.relay_parent, candidate.clone(), pov_block.clone(), ); virtual_overseer.send(FromOverseer::Communication{ msg: second }).await; let pov_to_second = PoVBlock { block_data: BlockData(vec![3, 2, 1]), }; let pov_block_hash = pov_to_second.hash(); let candidate_to_second = AbridgedCandidateReceipt { parachain_index: test_state.chain_ids[0], relay_parent: test_state.relay_parent, pov_block_hash, ..Default::default() }; let second = CandidateBackingMessage::Second( test_state.relay_parent, candidate_to_second.clone(), pov_to_second.clone(), ); // In order to trigger _some_ actions from subsystem ask it to second another // candidate. The only reason to do so is to make sure that no actions were // triggered on the prev step. virtual_overseer.send(FromOverseer::Communication{ msg: second }).await; assert_matches!( virtual_overseer.recv().await, AllMessages::CandidateValidation( CandidateValidationMessage::Validate( relay_parent, _, _, pov, _, ) ) => { assert_eq!(relay_parent, test_state.relay_parent); assert_eq!(pov, pov_to_second); } ); }); } }