Unverified Commit d7684115 authored by Peter Goodspeed-Niklaus's avatar Peter Goodspeed-Niklaus Committed by GitHub
Browse files

mod subsystem-util (#1376)

* Add subsystem-util crate.

Start by moving the JobCanceler here.

* copy utility functions for requesting runtime data; generalize

* convert subsystem-util from crate to module in subsystem

The point of making a sub-crate is to ensure that only the necessary
parts of a program get compiled; if a dependent package needed only
subsystem-util, and not subsystem, then subsystem wouldn't need to
be compiled.

However, that will never happen: subsystem-util depends on
subsystem::messages, so subsystem will always be compiled.

Therefore, it makes more sense to add it as a module in the existing
crate than as a new and distinct crate.

* make runtime request sender type generic

* candidate backing subsystem uses util for api requests

* add struct Validator representing the local validator

This struct can be constructed when the local node is a validator;
the constructor fails otherwise. It stores a bit of local data, and
provides some utility methods.

* add alternate constructor for better efficiency

* refactor candidate backing to use utility methods

* fix test breakage caused by reordering tests

* restore test which accidentally got deleted during merge

* start extracting jobs management into helper traits + structs

* use util::{JobHandle, Jobs} in CandidateBackingSubsystem

* implement generic job-manager subsystem impl

This means that the work of implementing a subsystem boils down
to implementing the job, and then writing an appropriate
type definition, i.e.

pub type CandidateBackingSubsystem<Spawner, Context> =
	util::JobManager<Spawner, Context, CandidateBackingJob>;

* add hash-extraction helper to messages

* fix errors caused by improper rebase

* doc improvement

* simplify conversion from overseer communication to job message

* document fn hash for all messages

* rename fn hash() -> fn relay_parent

* gracefully shut down running futures on Conclude

* ensure we're validating with the proper validator index

* rename: handle_unhashed_msg -> handle_orphan_msg

* impl Stream for Jobs<Spawner, Job>

This turns out to be relatively complicated and requires some
unsafe code, so we'll want either detailed review, or to choose
to revert this commit.

* add missing documentation for public items

* use pin-project to eliminate unsafe code from this codebase

* rename SenderMessage -> FromJob

* reenvision the subsystem requests as an extension trait

This works within `util.rs`, but fails in `core/backing/src/lib.rs`,
because we don't actually create the struct soon enough. Continuing
down this path would imply substantial rewriting.

* Revert "reenvision the subsystem requests as an extension trait"

This reverts commit a5639e36.

The fact is, the new API is more complicated to no real benefit.

* apply suggested futuresunordered join_all impl

* CandidateValidationMessage variants have no top-level relay parents

* rename handle_orphan_msg -> handle_unanchored_msg

* make most node-core-backing types private

Now the only public types exposed in that module are
CandidateBackingSubsystem and ToJob. While ideally we could reduce
the public interface to only the former type, that doesn't work
because ToJob appears in the public interface of CandidateBackingSubsystem.

This also involves changing the definition of CandidateBackingSubsystem;
it is no longer a typedef, but a struct wrapping the job manager.
parent 587c6858
Pipeline #100435 passed with stages
in 26 minutes and 1 second
......@@ -4488,8 +4488,6 @@ dependencies = [
"bitvec",
"derive_more 0.99.9",
"futures 0.3.5",
"futures-timer 3.0.2",
"log 0.4.8",
"polkadot-erasure-coding",
"polkadot-node-primitives",
"polkadot-node-subsystem",
......@@ -4502,7 +4500,6 @@ dependencies = [
"sp-blockchain",
"sp-core",
"sp-keyring",
"streamunordered",
]
[[package]]
......@@ -4547,11 +4544,19 @@ name = "polkadot-node-subsystem"
version = "0.1.0"
dependencies = [
"async-trait",
"derive_more 0.99.9",
"futures 0.3.5",
"futures-timer 3.0.2",
"log 0.4.8",
"parity-scale-codec",
"pin-project",
"polkadot-node-primitives",
"polkadot-primitives",
"polkadot-statement-table",
"sc-keystore",
"sc-network",
"sp-core",
"streamunordered",
]
[[package]]
......
......@@ -6,20 +6,16 @@ edition = "2018"
[dependencies]
futures = "0.3.5"
log = "0.4.8"
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "master" }
primitives = { package = "sp-core", git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
erasure-coding = { package = "polkadot-erasure-coding", path = "../../../erasure-coding" }
statement-table = { package = "polkadot-statement-table", path = "../../../statement-table" }
futures-timer = "3.0.2"
streamunordered = "0.5.1"
derive_more = "0.99.9"
bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] }
......
......@@ -16,45 +16,43 @@
//! Implements a `CandidateBackingSubsystem`.
#![recursion_limit="256"]
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use bitvec::vec::BitVec;
use log;
use futures::{
select, FutureExt, SinkExt, StreamExt,
channel::{oneshot, mpsc},
future::{self, Either},
task::{Spawn, SpawnError, SpawnExt},
channel::{mpsc, oneshot},
task::{Spawn, SpawnError},
Future, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
use streamunordered::{StreamUnordered, StreamYield};
use primitives::Pair;
use keystore::KeyStorePtr;
use polkadot_primitives::v1::{
CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorPair, ValidatorId,
CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorId,
ValidatorIndex, SigningContext, PoV, OmittedValidationData,
CandidateDescriptor, AvailableData, ErasureChunk, ValidatorSignature, Hash, CandidateReceipt,
CandidateCommitments,
};
use polkadot_node_primitives::{
FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult,
ValidationOutputs,
FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport,
ValidationOutputs, ValidationResult,
};
use polkadot_subsystem::{
FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem,
};
use polkadot_subsystem::messages::{
AllMessages, CandidateBackingMessage, CandidateSelectionMessage, SchedulerRoster,
RuntimeApiMessage, RuntimeApiRequest, CandidateValidationMessage, ValidationFailed,
StatementDistributionMessage, NewBackedCandidate, ProvisionerMessage, ProvisionableData,
PoVDistributionMessage, AvailabilityStoreMessage,
Subsystem, SubsystemContext, SpawnedSubsystem,
messages::{
AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData,
ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, ValidationFailed,
},
util::{
self,
request_signing_context,
request_validator_groups,
request_validators,
Validator,
},
};
use statement_table::{
generic::AttestedCandidate as TableAttestedCandidate,
......@@ -68,9 +66,7 @@ use statement_table::{
#[derive(Debug, derive_more::From)]
enum Error {
NotInValidatorSet,
CandidateNotFound,
JobNotFound(Hash),
InvalidSignature,
#[from]
Erasure(erasure_coding::Error),
......@@ -82,6 +78,8 @@ enum Error {
Mpsc(mpsc::SendError),
#[from]
Spawn(SpawnError),
#[from]
UtilError(util::Error),
}
/// Holds all data needed for candidate backing job operation.
......@@ -92,7 +90,6 @@ struct CandidateBackingJob {
rx_to: mpsc::Receiver<ToJob>,
/// Outbound message channel sending part.
tx_from: mpsc::Sender<FromJob>,
/// The `ParaId`s assigned to this validator.
assignment: ParaId,
/// We issued `Valid` or `Invalid` statements on about these candidates.
......@@ -101,7 +98,6 @@ struct CandidateBackingJob {
seconded: Option<Hash>,
/// We have already reported misbehaviors for these validators.
reported_misbehavior_for: HashSet<ValidatorIndex>,
table: Table<TableContext>,
table_context: TableContext,
}
......@@ -113,7 +109,7 @@ const fn group_quorum(n_validators: usize) -> usize {
#[derive(Default)]
struct TableContext {
signing_context: SigningContext,
key: Option<ValidatorPair>,
validator: Option<Validator>,
groups: HashMap<ParaId, Vec<ValidatorIndex>>,
validators: Vec<ValidatorId>,
}
......@@ -142,30 +138,40 @@ impl TableContextTrait for TableContext {
}
}
impl TableContext {
fn local_id(&self) -> Option<ValidatorId> {
self.key.as_ref().map(|k| k.public())
/// A message type that is sent from `CandidateBackingSubsystem` to `CandidateBackingJob`.
pub enum ToJob {
/// A `CandidateBackingMessage`.
CandidateBacking(CandidateBackingMessage),
/// Stop working.
Stop,
}
impl TryFrom<AllMessages> for ToJob {
type Error = ();
fn try_from(msg: AllMessages) -> Result<Self, Self::Error> {
match msg {
AllMessages::CandidateBacking(msg) => Ok(ToJob::CandidateBacking(msg)),
_ => Err(()),
}
}
}
fn local_index(&self) -> Option<ValidatorIndex> {
self.local_id().and_then(|id|
self.validators
.iter()
.enumerate()
.find(|(_, k)| k == &&id)
.map(|(i, _)| i as ValidatorIndex)
)
impl From<CandidateBackingMessage> for ToJob {
fn from(msg: CandidateBackingMessage) -> Self {
Self::CandidateBacking(msg)
}
}
const CHANNEL_CAPACITY: usize = 64;
impl util::ToJobTrait for ToJob {
const STOP: Self = ToJob::Stop;
/// A message type that is sent from `CandidateBackingSubsystem` to `CandidateBackingJob`.
enum ToJob {
/// A `CandidateBackingMessage`.
CandidateBacking(CandidateBackingMessage),
/// Stop working.
Stop,
fn relay_parent(&self) -> Option<Hash> {
match self {
Self::CandidateBacking(cb) => cb.relay_parent(),
Self::Stop => None,
}
}
}
/// A message type that is sent from `CandidateBackingJob` to `CandidateBackingSubsystem`.
......@@ -193,6 +199,23 @@ impl From<FromJob> for AllMessages {
}
}
impl TryFrom<AllMessages> for FromJob {
type Error = &'static str;
fn try_from(f: AllMessages) -> Result<Self, Self::Error> {
match f {
AllMessages::AvailabilityStore(msg) => Ok(FromJob::AvailabilityStore(msg)),
AllMessages::RuntimeApi(msg) => Ok(FromJob::RuntimeApiMessage(msg)),
AllMessages::CandidateValidation(msg) => Ok(FromJob::CandidateValidation(msg)),
AllMessages::CandidateSelection(msg) => Ok(FromJob::CandidateSelection(msg)),
AllMessages::StatementDistribution(msg) => Ok(FromJob::StatementDistribution(msg)),
AllMessages::PoVDistribution(msg) => Ok(FromJob::PoVDistribution(msg)),
AllMessages::Provisioner(msg) => Ok(FromJob::Provisioner(msg)),
_ => Err("can't convert this AllMessages variant to FromJob"),
}
}
}
// It looks like it's not possible to do an `impl From` given the current state of
// the code. So this does the necessary conversion.
fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement {
......@@ -209,19 +232,9 @@ fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement
}
}
// finds the first key we are capable of signing with out of the given set of validators,
// if any.
fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<ValidatorPair> {
let keystore = keystore.read();
validators.iter()
.find_map(|v| {
keystore.key_pair::<ValidatorPair>(&v).ok()
})
}
impl CandidateBackingJob {
/// Run asynchronously.
async fn run(mut self) -> Result<(), Error> {
async fn run_loop(mut self) -> Result<(), Error> {
while let Some(msg) = self.rx_to.next().await {
match msg {
ToJob::CandidateBacking(msg) => {
......@@ -328,9 +341,7 @@ impl CandidateBackingJob {
None => continue,
};
let mut validator_indices = BitVec::with_capacity(
group.len()
);
let mut validator_indices = BitVec::with_capacity(group.len());
validator_indices.resize(group.len(), false);
......@@ -371,7 +382,7 @@ impl CandidateBackingJob {
if let Ok(report) = MisbehaviorReport::try_from(f) {
let message = ProvisionerMessage::ProvisionableData(
ProvisionableData::MisbehaviorReport(self.parent, report)
ProvisionableData::MisbehaviorReport(self.parent, report),
);
reports.push(message);
......@@ -513,18 +524,7 @@ impl CandidateBackingJob {
}
fn sign_statement(&self, statement: Statement) -> Option<SignedFullStatement> {
let local_index = self.table_context.local_index()?;
let signing_key = self.table_context.key.as_ref()?;
let signed_statement = SignedFullStatement::sign(
statement,
&self.table_context.signing_context,
local_index,
signing_key,
);
Some(signed_statement)
Some(self.table_context.validator.as_ref()?.sign(statement))
}
fn check_statement_signature(&self, statement: &SignedFullStatement) -> Result<(), Error> {
......@@ -657,329 +657,133 @@ impl CandidateBackingJob {
}
}
struct JobHandle {
abort_handle: future::AbortHandle,
to_job: mpsc::Sender<ToJob>,
finished: oneshot::Receiver<()>,
su_handle: usize,
}
impl util::JobTrait for CandidateBackingJob {
type ToJob = ToJob;
type FromJob = FromJob;
type Error = Error;
type RunArgs = KeyStorePtr;
impl JobHandle {
async fn stop(mut self) {
let _ = self.to_job.send(ToJob::Stop).await;
let stop_timer = Delay::new(Duration::from_secs(1));
match future::select(stop_timer, self.finished).await {
Either::Left((_, _)) => {
},
Either::Right((_, _)) => {
self.abort_handle.abort();
},
}
}
const NAME: &'static str = "CandidateBackingJob";
async fn send_msg(&mut self, msg: ToJob) -> Result<(), Error> {
Ok(self.to_job.send(msg).await?)
}
}
struct Jobs<S> {
spawner: S,
running: HashMap<Hash, JobHandle>,
outgoing_msgs: StreamUnordered<mpsc::Receiver<FromJob>>,
}
async fn run_job(
parent: Hash,
keystore: KeyStorePtr,
rx_to: mpsc::Receiver<ToJob>,
mut tx_from: mpsc::Sender<FromJob>,
) -> Result<(), Error> {
let (validators, roster) = futures::try_join!(
request_validators(parent, &mut tx_from).await?,
request_validator_groups(parent, &mut tx_from).await?,
)?;
let key = signing_key(&validators[..], &keystore).ok_or(Error::NotInValidatorSet)?;
let mut groups = HashMap::new();
for assignment in roster.scheduled {
if let Some(g) = roster.validator_groups.get(assignment.group_idx.0 as usize) {
groups.insert(
assignment.para_id,
g.clone(),
);
}
}
let mut assignment = Default::default();
if let Some(idx) = validators.iter().position(|k| *k == key.public()) {
let idx = idx as u32;
for (para_id, group) in groups.iter() {
if group.contains(&idx) {
assignment = *para_id;
break;
fn run(
parent: Hash,
keystore: KeyStorePtr,
rx_to: mpsc::Receiver<Self::ToJob>,
mut tx_from: mpsc::Sender<Self::FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
let (validators, roster, signing_context) = futures::try_join!(
request_validators(parent, &mut tx_from).await?,
request_validator_groups(parent, &mut tx_from).await?,
request_signing_context(parent, &mut tx_from).await?,
)?;
let validator = Validator::construct(&validators, signing_context, keystore.clone())?;
let mut groups = HashMap::new();
for assignment in roster.scheduled {
if let Some(g) = roster.validator_groups.get(assignment.group_idx.0 as usize) {
groups.insert(assignment.para_id, g.clone());
}
}
}
}
let signing_context = request_signing_context(parent, &mut tx_from).await?.await?;
let table_context = TableContext {
signing_context,
key: Some(key),
groups,
validators,
};
let job = CandidateBackingJob {
parent,
rx_to,
tx_from,
assignment,
issued_statements: HashSet::new(),
seconded: None,
reported_misbehavior_for: HashSet::new(),
table: Table::default(),
table_context,
};
job.run().await
}
/// Request a validator set from the `RuntimeApi`.
async fn request_validators(
parent: Hash,
s: &mut mpsc::Sender<FromJob>,
) -> Result<oneshot::Receiver<Vec<ValidatorId>>, Error> {
let (tx, rx) = oneshot::channel();
s.send(FromJob::RuntimeApiMessage(RuntimeApiMessage::Request(
parent,
RuntimeApiRequest::Validators(tx),
)
)).await?;
let mut assignment = Default::default();
Ok(rx)
}
/// Request the scheduler roster from `RuntimeApi`.
async fn request_validator_groups(
parent: Hash,
s: &mut mpsc::Sender<FromJob>,
) -> Result<oneshot::Receiver<SchedulerRoster>, Error> {
let (tx, rx) = oneshot::channel();
s.send(FromJob::RuntimeApiMessage(RuntimeApiMessage::Request(
parent,
RuntimeApiRequest::ValidatorGroups(tx),
)
)).await?;
Ok(rx)
}
/// Request a `SigningContext` from the `RuntimeApi`.
async fn request_signing_context(
parent: Hash,
s: &mut mpsc::Sender<FromJob>,
) -> Result<oneshot::Receiver<SigningContext>, Error> {
let (tx, rx) = oneshot::channel();
s.send(FromJob::RuntimeApiMessage(RuntimeApiMessage::Request(
parent,
RuntimeApiRequest::SigningContext(tx),
)
)).await?;
Ok(rx)
}
impl<S: Spawn> Jobs<S> {
fn new(spawner: S) -> Self {
Self {
spawner,
running: HashMap::default(),
outgoing_msgs: StreamUnordered::new(),
}
}
fn spawn_job(&mut self, parent_hash: Hash, keystore: KeyStorePtr) -> Result<(), Error> {
let (to_job_tx, to_job_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (from_job_tx, from_job_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (future, abort_handle) = future::abortable(async move {
if let Err(e) = run_job(parent_hash, keystore, to_job_rx, from_job_tx).await {
log::error!(
"CandidateBackingJob({}) finished with an error {:?}",
parent_hash,
e,
);
if let Some(idx) = validators.iter().position(|k| *k == validator.id()) {
let idx = idx as u32;
for (para_id, group) in groups.iter() {
if group.contains(&idx) {
assignment = *para_id;
break;
}
}
}
});
let (finished_tx, finished) = oneshot::channel();
let future = async move {
let _ = future.await;
let _ = finished_tx.send(());
};
self.spawner.spawn(future)?;
let su_handle = self.outgoing_msgs.push(from_job_rx);
let handle = JobHandle {
abort_handle,
to_job: to_job_tx,
finished,
su_handle,
};
self.running.insert(parent_hash, handle);
Ok(())
}
let table_context = TableContext {
groups,
validators,
signing_context: validator.signing_context().clone(),
validator: Some(validator),
};
async fn stop_job(&mut self, parent_hash: Hash) -> Result<(), Error> {
match self.running.remove(&parent_hash) {
Some(handle) => {
Pin::new(&mut self.outgoing_msgs).remove(handle.su_handle);
handle.stop().await;
Ok(())
}
None => Err(Error::JobNotFound(parent_hash))
}
}
let job = CandidateBackingJob {
parent,
rx_to,
tx_from,
assignment,
issued_statements: HashSet::new(),
seconded: None,
reported_misbehavior_for: HashSet::new(),
table: Table::default(),
table_context,
};
async fn send_msg(&mut self, parent_hash: Hash, msg: ToJob) -> Result<(), Error> {
if let Some(job) = self.running.get_mut(&parent_hash) {
job.send_msg(msg).await?;
job.run_loop().await
}
Ok(())
}
async fn next(&mut self) -> Option<FromJob> {
self.outgoing_msgs.next().await.and_then(|(e, _)| match e {
StreamYield::Item(e) => Some(e),
_ => None,
})
.boxed()
}
}
/// Manager type for the CandidateBackingSubsystem
type Manager<Spawner, Context> = util::JobManager<Spawner, Context, CandidateBackingJob>;
/// An implementation of the Candidate Backing subsystem.
pub struct CandidateBackingSubsystem<S, Context> {
spawner: S,
keystore: KeyStorePtr,
_context: std::marker::PhantomData<Context>,
pub struct CandidateBackingSubsystem<Spawner, Context> {
manager: Manager<Spawner, Context>,
}
impl<S, Context> CandidateBackingSubsystem<S, Context>
where
S: Spawn + Clone,
Context: SubsystemContext<Message=CandidateBackingMessage>,
impl<Spawner, Context> CandidateBackingSubsystem<Spawner, Context>
where
Spawner: Clone + Spawn + Send + Unpin,
Context: SubsystemContext,
ToJob: From<<Context as SubsystemContext>::Message>,
{
/// Creates a new `CandidateBackingSubsystem`.
pub fn new(keystore: KeyStorePtr, spawner: S) -> Self {
Self {
spawner,
keystore,
_context: std::marker::PhantomData,
pub fn new(spawner: Spawner, keystore: KeyStorePtr) -> Self {
CandidateBackingSubsystem {
manager: util::JobManager::new(spawner, keystore)
}
}
async fn run(