Unverified Commit a39d8803 authored by Peter Goodspeed-Niklaus's avatar Peter Goodspeed-Niklaus Committed by GitHub
Browse files

implement provisioner (#1473)

* sketch out provisioner basics

* handle provisionable data

* stub out select_inherent_data

* split runtime APIs into sub-chapters to improve linkability

* explain SignedAvailabilityBitfield semantics

* add internal link to further documentation

* some more work figuring out how the provisioner can do its thing

* fix broken link

* don't import enum variants where it's one layer deep

* make request_availability_cores a free fn in util

* document more precisely what should happen on block production

* finish first-draft implementation of provisioner

* start working on the full and proper backed candidate selection rule

* Pass number of block under construction via RequestInherentData

* Revert "Pass number of block under construction via RequestInherentData"

This reverts commit 850fe62c.

That initially looked like the better approach--it spent the time
budget for fetching the block number in the proposer, instead of
the provisioner, and that felt more appropriate--but it turns out
not to be obvious how to get the block number of the block under
construction from within the proposer. The Chain API may be less
ideal, but it should be easier to implement.

* wip: get the block under production from the Chain API

* add ChainApiMessage to AllMessages

* don't break the run loop if a provisionable data channel closes

* clone only those backed candidates which are coherent

* propagate chain_api subsystem through various locations

* add delegated_subsystem! macro to ease delegating subsystems

Unfortunately, it doesn't work right:

```
error[E0446]: private type `CandidateBackingJob` in public interface
   --> node/core/backing/src/lib.rs:775:1
    |
86  | struct CandidateBackingJob {
    | - `CandidateBackingJob` declared as private
...
775 | delegated_subsystem!(CandidateBackingJob as CandidateBackingSubsystem);
    | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ can't leak private type
```

I'm not sure precisely what's going wrong, here; I suspect the problem is
the use of `$job as JobTrait>::RunArgs` and `::ToJob`; the failure would be
that it's not reifying the types to verify that the actual types are public,
but instead referring to them via `CandidateBackingJob`, which is in fact private;
that privacy is the point.

Going to see if I can generic my way out of this, but we may be headed for a
quick revert here.

* fix delegated_subsystem

The invocation is a bit more verbose than I'd prefer, but it's also
more explicit about what types need to be public. I'll take it as a win.

* add provisioning subsystem; reduce public interface of provisioner

* deny missing docs in provisioner

* refactor core selection per code review suggestion

This is twice as much code when measured by line, but IMO it is
in fact somewhat clearer to read, so overall a win.

Also adds an improved rule for selecting availability bitfields,
which (unlike the previous implementation) guarantees that the
appropriate postconditions hold there.

* fix bad merge double-declaration

* update guide with (hopefully) complete provisioner candidate selection procedure

* clarify candidate selection algorithm

* Revert "clarify candidate selection algorithm"

This reverts commit c68a02ac.

* clarify candidate selection algorithm

* update provisioner to implement candidate selection per the guide

* add test that no more than one bitfield is selected per validator

* add test that each selected bitfield corresponds to an occupied core

* add test that more set bits win conflicts

* add macro for specializing runtime requests; specailize all runtime requests

* add tests harness for select_candidates tests

* add first real select_candidates test, fix test_harness

* add mock overseer and test that success is possible

* add test that the candidate selection algorithm picks the right ones

* make candidate selection test somewhat more stringent
parent 864fff12
Pipeline #103263 passed with stages
in 22 minutes and 43 seconds
...@@ -4313,7 +4313,7 @@ checksum = "feb3b2b1033b8a60b4da6ee470325f887758c95d5320f52f9ce0df055a55940e" ...@@ -4313,7 +4313,7 @@ checksum = "feb3b2b1033b8a60b4da6ee470325f887758c95d5320f52f9ce0df055a55940e"
[[package]] [[package]]
name = "polkadot" name = "polkadot"
version = "0.8.22" version = "0.8.19"
dependencies = [ dependencies = [
"assert_cmd", "assert_cmd",
"futures 0.3.5", "futures 0.3.5",
...@@ -4633,6 +4633,21 @@ dependencies = [ ...@@ -4633,6 +4633,21 @@ dependencies = [
"wasm-timer", "wasm-timer",
] ]
[[package]]
name = "polkadot-node-core-provisioner"
version = "0.1.0"
dependencies = [
"bitvec",
"derive_more 0.99.9",
"futures 0.3.5",
"lazy_static",
"log 0.4.8",
"polkadot-node-subsystem",
"polkadot-primitives",
"sp-core",
"tokio 0.2.21",
]
[[package]] [[package]]
name = "polkadot-node-core-runtime-api" name = "polkadot-node-core-runtime-api"
version = "0.1.0" version = "0.1.0"
......
...@@ -4,7 +4,7 @@ path = "src/main.rs" ...@@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "polkadot" name = "polkadot"
version = "0.8.22" version = "0.8.19"
authors = ["Parity Technologies <admin@parity.io>"] authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018" edition = "2018"
...@@ -51,6 +51,7 @@ members = [ ...@@ -51,6 +51,7 @@ members = [
"node/core/candidate-validation", "node/core/candidate-validation",
"node/core/chain-api", "node/core/chain-api",
"node/core/proposer", "node/core/proposer",
"node/core/provisioner",
"node/core/runtime-api", "node/core/runtime-api",
"node/network/bridge", "node/network/bridge",
"node/network/pov-distribution", "node/network/pov-distribution",
......
...@@ -36,10 +36,9 @@ use polkadot_primitives::v1::{ ...@@ -36,10 +36,9 @@ use polkadot_primitives::v1::{
}; };
use polkadot_node_primitives::{ use polkadot_node_primitives::{
FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport,
ValidationOutputs, ValidationResult, SpawnNamed, ValidationOutputs, ValidationResult,
}; };
use polkadot_subsystem::{ use polkadot_subsystem::{
Subsystem, SubsystemContext, SpawnedSubsystem,
messages::{ messages::{
AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage, AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData, CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData,
...@@ -54,6 +53,7 @@ use polkadot_subsystem::{ ...@@ -54,6 +53,7 @@ use polkadot_subsystem::{
request_from_runtime, request_from_runtime,
Validator, Validator,
}, },
delegated_subsystem,
}; };
use statement_table::{ use statement_table::{
generic::AttestedCandidate as TableAttestedCandidate, generic::AttestedCandidate as TableAttestedCandidate,
...@@ -772,45 +772,7 @@ impl util::JobTrait for CandidateBackingJob { ...@@ -772,45 +772,7 @@ impl util::JobTrait for CandidateBackingJob {
} }
} }
/// Manager type for the CandidateBackingSubsystem delegated_subsystem!(CandidateBackingJob(KeyStorePtr) <- ToJob as CandidateBackingSubsystem);
type Manager<Spawner, Context> = util::JobManager<Spawner, Context, CandidateBackingJob>;
/// An implementation of the Candidate Backing subsystem.
pub struct CandidateBackingSubsystem<Spawner, Context> {
manager: Manager<Spawner, Context>,
}
impl<Spawner, Context> CandidateBackingSubsystem<Spawner, Context>
where
Spawner: Clone + SpawnNamed + Send + Unpin,
Context: SubsystemContext,
ToJob: From<<Context as SubsystemContext>::Message>,
{
/// Creates a new `CandidateBackingSubsystem`.
pub fn new(spawner: Spawner, keystore: KeyStorePtr) -> Self {
CandidateBackingSubsystem {
manager: util::JobManager::new(spawner, keystore)
}
}
/// Run this subsystem
pub async fn run(ctx: Context, keystore: KeyStorePtr, spawner: Spawner) {
<Manager<Spawner, Context>>::run(ctx, keystore, spawner, None).await
}
}
impl<Spawner, Context> Subsystem<Context> for CandidateBackingSubsystem<Spawner, Context>
where
Spawner: SpawnNamed + Send + Clone + Unpin + 'static,
Context: SubsystemContext,
<Context as SubsystemContext>::Message: Into<ToJob>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
self.manager.start(ctx)
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
......
[package]
name = "polkadot-node-core-provisioner"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] }
derive_more = "0.99.9"
futures = "0.3.5"
log = "0.4.8"
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-subsystem = { path = "../../subsystem" }
[dev-dependencies]
lazy_static = "1.4"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
tokio = "0.2"
This diff is collapsed.
...@@ -83,7 +83,7 @@ impl EncodeAs<CompactStatement> for Statement { ...@@ -83,7 +83,7 @@ impl EncodeAs<CompactStatement> for Statement {
pub type SignedFullStatement = Signed<Statement, CompactStatement>; pub type SignedFullStatement = Signed<Statement, CompactStatement>;
/// A misbehaviour report. /// A misbehaviour report.
#[derive(Debug)] #[derive(Debug, Clone)]
pub enum MisbehaviorReport { pub enum MisbehaviorReport {
/// These validator nodes disagree on this candidate's validity, please figure it out /// These validator nodes disagree on this candidate's validity, please figure it out
/// ///
......
...@@ -407,7 +407,8 @@ impl StatementDistributionMessage { ...@@ -407,7 +407,8 @@ impl StatementDistributionMessage {
} }
/// This data becomes intrinsics or extrinsics which should be included in a future relay chain block. /// This data becomes intrinsics or extrinsics which should be included in a future relay chain block.
#[derive(Debug)] // It needs to be cloneable because multiple potential block authors can request copies.
#[derive(Debug, Clone)]
pub enum ProvisionableData { pub enum ProvisionableData {
/// This bitfield indicates the availability of various candidate blocks. /// This bitfield indicates the availability of various candidate blocks.
Bitfield(Hash, SignedAvailabilityBitfield), Bitfield(Hash, SignedAvailabilityBitfield),
...@@ -488,8 +489,6 @@ pub enum AllMessages { ...@@ -488,8 +489,6 @@ pub enum AllMessages {
CandidateBacking(CandidateBackingMessage), CandidateBacking(CandidateBackingMessage),
/// Message for the candidate selection subsystem. /// Message for the candidate selection subsystem.
CandidateSelection(CandidateSelectionMessage), CandidateSelection(CandidateSelectionMessage),
/// Message for the Chain API subsystem.
ChainApi(ChainApiMessage),
/// Message for the statement distribution subsystem. /// Message for the statement distribution subsystem.
StatementDistribution(StatementDistributionMessage), StatementDistribution(StatementDistributionMessage),
/// Message for the availability distribution subsystem. /// Message for the availability distribution subsystem.
...@@ -508,6 +507,8 @@ pub enum AllMessages { ...@@ -508,6 +507,8 @@ pub enum AllMessages {
AvailabilityStore(AvailabilityStoreMessage), AvailabilityStore(AvailabilityStoreMessage),
/// Message for the network bridge subsystem. /// Message for the network bridge subsystem.
NetworkBridge(NetworkBridgeMessage), NetworkBridge(NetworkBridgeMessage),
/// Message for the Chain API subsystem
ChainApi(ChainApiMessage),
/// Test message /// Test message
/// ///
/// This variant is only valid while testing, but makes the process of testing the /// This variant is only valid while testing, but makes the process of testing the
......
...@@ -21,10 +21,8 @@ ...@@ -21,10 +21,8 @@
//! this module. //! this module.
use crate::{ use crate::{
messages::{
AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender,
},
errors::{ChainApiError, RuntimeApiError}, errors::{ChainApiError, RuntimeApiError},
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult, FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
}; };
use futures::{ use futures::{
...@@ -40,13 +38,12 @@ use keystore::KeyStorePtr; ...@@ -40,13 +38,12 @@ use keystore::KeyStorePtr;
use parity_scale_codec::Encode; use parity_scale_codec::Encode;
use pin_project::{pin_project, pinned_drop}; use pin_project::{pin_project, pinned_drop};
use polkadot_primitives::v1::{ use polkadot_primitives::v1::{
EncodeAs, Hash, Signed, SigningContext, SessionIndex, CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, GlobalValidationData,
ValidatorId, ValidatorIndex, ValidatorPair, GroupRotationInfo, GroupRotationInfo, Hash, Id as ParaId, LocalValidationData, OccupiedCoreAssumption,
}; SessionIndex, Signed, SigningContext, ValidationCode, ValidatorId, ValidatorIndex,
use sp_core::{ ValidatorPair,
Pair,
traits::SpawnNamed,
}; };
use sp_core::Pair;
use std::{ use std::{
collections::HashMap, collections::HashMap,
convert::{TryFrom, TryInto}, convert::{TryFrom, TryInto},
...@@ -56,6 +53,11 @@ use std::{ ...@@ -56,6 +53,11 @@ use std::{
}; };
use streamunordered::{StreamUnordered, StreamYield}; use streamunordered::{StreamUnordered, StreamYield};
/// This reexport is required so that external crates can use the `delegated_subsystem` macro properly.
///
/// Otherwise, downstream crates might have to modify their `Cargo.toml` to ensure `sp-core` appeared there.
pub use sp_core::traits::SpawnNamed;
/// Duration a job will wait after sending a stop signal before hard-aborting. /// Duration a job will wait after sending a stop signal before hard-aborting.
pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1); pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
/// Capacity of channels to and from individual jobs /// Capacity of channels to and from individual jobs
...@@ -119,42 +121,67 @@ where ...@@ -119,42 +121,67 @@ where
Ok(rx) Ok(rx)
} }
/// Request a validator set from the `RuntimeApi`. /// Construct specialized request functions for the runtime.
pub async fn request_validators<FromJob>( ///
parent: Hash, /// These would otherwise get pretty repetitive.
s: &mut mpsc::Sender<FromJob>, macro_rules! specialize_requests {
) -> Result<RuntimeApiReceiver<Vec<ValidatorId>>, Error> // expand return type name for documentation purposes
where (fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
FromJob: TryFrom<AllMessages>, specialize_requests!{
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug, named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
{ }
request_from_runtime(parent, s, |tx| RuntimeApiRequest::Validators(tx)).await };
}
/// Request the validator groups. // create a single specialized request function
pub async fn request_validator_groups<FromJob>( (named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
parent: Hash, #[doc = "Request `"]
s: &mut mpsc::Sender<FromJob>, #[doc = $doc_name]
) -> Result<RuntimeApiReceiver<(Vec<Vec<ValidatorIndex>>, GroupRotationInfo)>, Error> #[doc = "` from the runtime"]
where pub async fn $func_name<FromJob>(
FromJob: TryFrom<AllMessages>, parent: Hash,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug, $(
{ $param_name: $param_ty,
request_from_runtime(parent, s, |tx| RuntimeApiRequest::ValidatorGroups(tx)).await )*
sender: &mut mpsc::Sender<FromJob>,
) -> Result<RuntimeApiReceiver<$return_ty>, Error>
where
FromJob: TryFrom<AllMessages>,
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug,
{
request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
$( $param_name, )* tx
)).await
}
};
// recursive decompose
(
fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
$(
fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
)+
) => {
specialize_requests!{
fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
}
specialize_requests!{
$(
fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
)+
}
};
} }
/// Request the session index of the child block. specialize_requests! {
pub async fn request_session_index_for_child<FromJob>( fn request_validators() -> Vec<ValidatorId>; Validators;
parent: Hash, fn request_validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
s: &mut mpsc::Sender<FromJob>, fn request_availability_cores() -> Vec<CoreState>; AvailabilityCores;
) -> Result<RuntimeApiReceiver<SessionIndex>, Error> fn request_global_validation_data() -> GlobalValidationData; GlobalValidationData;
where fn request_local_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<LocalValidationData>; LocalValidationData;
FromJob: TryFrom<AllMessages>, fn request_session_index_for_child() -> SessionIndex; SessionIndexForChild;
<FromJob as TryFrom<AllMessages>>::Error: std::fmt::Debug, fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
{ fn request_candidate_pending_availability(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
request_from_runtime(parent, s, |tx| { fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
RuntimeApiRequest::SessionIndexForChild(tx)
}).await
} }
/// From the given set of validators, find the first key we can sign with, if any. /// From the given set of validators, find the first key we can sign with, if any.
...@@ -405,8 +432,13 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> { ...@@ -405,8 +432,13 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
/// the error is forwarded onto the provided channel. /// the error is forwarded onto the provided channel.
/// ///
/// Errors if the error channel already exists. /// Errors if the error channel already exists.
pub fn forward_errors(&mut self, tx: mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>) -> Result<(), Error> { pub fn forward_errors(
if self.errors.is_some() { return Err(Error::AlreadyForwarding) } &mut self,
tx: mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>,
) -> Result<(), Error> {
if self.errors.is_some() {
return Err(Error::AlreadyForwarding);
}
self.errors = Some(tx); self.errors = Some(tx);
Ok(()) Ok(())
} }
...@@ -510,13 +542,12 @@ where ...@@ -510,13 +542,12 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Option<Self::Item>> {
// pin-project the outgoing messages // pin-project the outgoing messages
self.project() self.project().outgoing_msgs.poll_next(cx).map(|opt| {
.outgoing_msgs opt.and_then(|(stream_yield, _)| match stream_yield {
.poll_next(cx)
.map(|opt| opt.and_then(|(stream_yield, _)| match stream_yield {
StreamYield::Item(msg) => Some(msg), StreamYield::Item(msg) => Some(msg),
StreamYield::Finished(_) => None, StreamYield::Finished(_) => None,
})) })
})
} }
} }
...@@ -559,8 +590,13 @@ where ...@@ -559,8 +590,13 @@ where
/// the error is forwarded onto the provided channel. /// the error is forwarded onto the provided channel.
/// ///
/// Errors if the error channel already exists. /// Errors if the error channel already exists.
pub fn forward_errors(&mut self, tx: mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>) -> Result<(), Error> { pub fn forward_errors(
if self.errors.is_some() { return Err(Error::AlreadyForwarding) } &mut self,
tx: mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>,
) -> Result<(), Error> {
if self.errors.is_some() {
return Err(Error::AlreadyForwarding);
}
self.errors = Some(tx); self.errors = Some(tx);
Ok(()) Ok(())
} }
...@@ -576,10 +612,16 @@ where ...@@ -576,10 +612,16 @@ where
/// ///
/// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur. /// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur.
/// Otherwise, most are logged and then discarded. /// Otherwise, most are logged and then discarded.
pub async fn run(mut ctx: Context, run_args: Job::RunArgs, spawner: Spawner, mut err_tx: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>) { pub async fn run(
mut ctx: Context,
run_args: Job::RunArgs,
spawner: Spawner,
mut err_tx: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
) {
let mut jobs = Jobs::new(spawner.clone()); let mut jobs = Jobs::new(spawner.clone());
if let Some(ref err_tx) = err_tx { if let Some(ref err_tx) = err_tx {
jobs.forward_errors(err_tx.clone()).expect("we never call this twice in this context; qed"); jobs.forward_errors(err_tx.clone())
.expect("we never call this twice in this context; qed");
} }
loop { loop {
...@@ -592,7 +634,11 @@ where ...@@ -592,7 +634,11 @@ where
} }
// if we have a channel on which to forward errors, do so // if we have a channel on which to forward errors, do so
async fn fwd_err(hash: Option<Hash>, err: JobsError<Job::Error>, err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>) { async fn fwd_err(
hash: Option<Hash>,
err: JobsError<Job::Error>,
err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
) {
if let Some(err_tx) = err_tx { if let Some(err_tx) = err_tx {
// if we can't send on the error transmission channel, we can't do anything useful about it // if we can't send on the error transmission channel, we can't do anything useful about it
// still, we can at least log the failure // still, we can at least log the failure
...@@ -607,14 +653,17 @@ where ...@@ -607,14 +653,17 @@ where
incoming: SubsystemResult<FromOverseer<Context::Message>>, incoming: SubsystemResult<FromOverseer<Context::Message>>,
jobs: &mut Jobs<Spawner, Job>, jobs: &mut Jobs<Spawner, Job>,
run_args: &Job::RunArgs, run_args: &Job::RunArgs,
err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>> err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
) -> bool { ) -> bool {
use crate::FromOverseer::{Communication, Signal};
use crate::ActiveLeavesUpdate; use crate::ActiveLeavesUpdate;
use crate::OverseerSignal::{BlockFinalized, Conclude, ActiveLeaves}; use crate::FromOverseer::{Communication, Signal};
use crate::OverseerSignal::{ActiveLeaves, BlockFinalized, Conclude};
match incoming { match incoming {
Ok(Signal(ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }))) => { Ok(Signal(ActiveLeaves(ActiveLeavesUpdate {
activated,
deactivated,
}))) => {
for hash in activated { for hash in activated {
if let Err(e) = jobs.spawn_job(hash, run_args.clone()) { if let Err(e) = jobs.spawn_job(hash, run_args.clone()) {
log::error!("Failed to spawn a job: {:?}", e); log::error!("Failed to spawn a job: {:?}", e);
...@@ -638,10 +687,11 @@ where ...@@ -638,10 +687,11 @@ where
// Forwarding the stream to a drain means we wait until all of the items in the stream // Forwarding the stream to a drain means we wait until all of the items in the stream
// have completed. Contrast with `into_future`, which turns it into a future of `(head, rest_stream)`. // have completed. Contrast with `into_future`, which turns it into a future of `(head, rest_stream)`.
use futures::sink::drain; use futures::sink::drain;
use futures::stream::StreamExt;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
if let Err(e) = jobs.running if let Err(e) = jobs
.running
.drain() .drain()
.map(|(_, handle)| handle.stop()) .map(|(_, handle)| handle.stop())
.collect::<FuturesUnordered<_>>() .collect::<FuturesUnordered<_>>()
...@@ -686,7 +736,11 @@ where ...@@ -686,7 +736,11 @@ where
} }
// handle an outgoing message. return true if we should break afterwards. // handle an outgoing message. return true if we should break afterwards.
async fn handle_outgoing(outgoing: Option<Job::FromJob>, ctx: &mut Context, err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>) -> bool { async fn handle_outgoing(
outgoing: Option<Job::FromJob>,
ctx: &mut Context,
err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
) -> bool {
match outgoing { match outgoing {
Some(msg) => { Some(msg) => {
if let Err(e) = ctx.send_message(msg.into()).await { if let Err(e) = ctx.send_message(msg.into()).await {
...@@ -713,7 +767,6 @@ where ...@@ -713,7 +767,6 @@ where
let run_args = self.run_args.clone(); let run_args = self.run_args.clone();
let errors = self.errors; let errors = self.errors;
let future = Box::pin(async move { let future = Box::pin(async move {
Self::run(ctx, run_args, spawner, errors).await; Self::run(ctx, run_args, spawner, errors).await;
}); });
...@@ -725,41 +778,107 @@ where ...@@ -725,41 +778,107 @@ where
} }
} }
/// Create a delegated subsystem
///
/// It is possible to create a type which implements `Subsystem` by simply doing:
///
/// ```ignore
/// pub type ExampleSubsystem<Spawner, Context> = util::JobManager<Spawner, Context, ExampleJob>;
/// ```
///
/// However, doing this requires that job itself and all types which comprise it (i.e. `ToJob`, `FromJob`, `Error`, `RunArgs`)
/// are public, to avoid exposing private types in public interfaces. It's possible to delegate instead, which
/// can reduce the total number of public types exposed, i.e.
///
/// ```ignore
/// type Manager<Spawner, Context> = util::JobManager<Spawner, Context, ExampleJob>;
/// pub struct ExampleSubsystem {
/// manager: Manager<Spawner, Context>,
/// }
///
/// impl<Spawner, Context> Subsystem<Context> for ExampleSubsystem<Spawner, Context> { ... }
/// ```
///
/// This dramatically reduces the number of public types in the crate; the only things which must be public are now
///
/// - `struct ExampleSubsystem` (defined by this macro)
/// - `type ToJob` (because it appears in a trait bound)
/// - `type RunArgs` (because it appears in a function signature)
///
/// Implementing this all manually is of course possible, but it's tedious; why bother? This macro exists for
/// the purpose of doing it automatically: