Unverified Commit cc2d7afd authored by Peter Goodspeed-Niklaus's avatar Peter Goodspeed-Niklaus Committed by GitHub
Browse files

Add test suite and minor refinements to the utility subsystem (#1403)

* get conclude signal working properly; don't allocate a vector

* wip: add test suite / example / explanation for using utility subsystem

Unfortunately, the test fails right now for reasons which seem
very odd. Just have to keep poking at it.

* explicitly import everything

* fix subsystem-util test

The root problem here was two-fold:

- there was a circular dependency from subsystem -> test-helpers/subsystem ->
  subsystem
- cfg(test) doesn't propagate between crates

The solution: move the subsystem test helpers into a sub-module
within subsystem. Publicly export them from the previous location
so no other code breaks.

Doing this has an additional benefit: it ensures that no production
code can ever accidentally use the subsystem helpers, as they are compile-
gated on cfg(test).

* fully commit to moving test helpers into a subsystem module

* add some more tests

* get rid of log tests in favor of real error forwarding

It's not obvious whether we'll ever really want to chase down
these errors outside a testing context, but having the capability
won't hurt.

* fix issue which caused test to hang on osx

* only require that job errors are PartialEq when testing

also fix polkadot-node-core-backing tests

* get rid of any notion of partialeq

* rethink testing

Combine tests of starting and stopping job: leaving a test executor
with a job running was pretty clearly the cause of the sometimes-hang.

Also, add a timeout so tests _can't_ hang anymore; they just fail
after a while.

* rename fwd_errors -> forward_errors

* warn on error propagation failure

* fix unused import leftover from merge

* derive eq for subsystemerror
parent 57b57ab3
Pipeline #101099 passed with stages
in 19 minutes and 59 seconds
......@@ -4457,7 +4457,6 @@ dependencies = [
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-primitives",
"polkadot-subsystem-test-helpers",
"sc-network",
"sp-core",
"sp-runtime",
......@@ -4498,7 +4497,6 @@ dependencies = [
"polkadot-node-subsystem",
"polkadot-primitives",
"polkadot-statement-table",
"polkadot-subsystem-test-helpers",
"sc-client-api",
"sc-keystore",
"sp-api",
......@@ -4549,12 +4547,14 @@ dependencies = [
name = "polkadot-node-subsystem"
version = "0.1.0"
dependencies = [
"assert_matches",
"async-trait",
"derive_more 0.99.9",
"futures 0.3.5",
"futures-timer 3.0.2",
"log 0.4.8",
"parity-scale-codec",
"parking_lot 0.10.2",
"pin-project",
"polkadot-node-primitives",
"polkadot-primitives",
......@@ -4615,7 +4615,6 @@ dependencies = [
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-primitives",
"polkadot-subsystem-test-helpers",
"sc-network",
"sp-core",
"sp-runtime",
......@@ -4970,7 +4969,6 @@ dependencies = [
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-primitives",
"polkadot-subsystem-test-helpers",
"sp-core",
"sp-keyring",
"sp-runtime",
......@@ -4987,17 +4985,6 @@ dependencies = [
"sp-core",
]
[[package]]
name = "polkadot-subsystem-test-helpers"
version = "0.1.0"
dependencies = [
"async-trait",
"futures 0.3.5",
"parking_lot 0.10.2",
"polkadot-node-subsystem",
"sp-core",
]
[[package]]
name = "polkadot-test-runtime"
version = "0.8.17"
......
......@@ -52,7 +52,6 @@ members = [
"node/service",
"node/core/backing",
"node/subsystem",
"node/test-helpers/subsystem",
"node/test-service",
"parachain/test-parachains",
......
......@@ -22,5 +22,5 @@ bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = { version = "0.3.5", features = ["thread-pool"] }
subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" }
assert_matches = "1.3.0"
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem", features = [ "test-helpers" ] }
......@@ -745,7 +745,7 @@ where
/// Run this subsystem
pub async fn run(ctx: Context, keystore: KeyStorePtr, spawner: Spawner) {
<Manager<Spawner, Context>>::run(ctx, keystore, spawner).await
<Manager<Spawner, Context>>::run(ctx, keystore, spawner, None).await
}
}
......@@ -895,13 +895,13 @@ mod tests {
}
struct TestHarness {
virtual_overseer: subsystem_test::TestSubsystemContextHandle<CandidateBackingMessage>,
virtual_overseer: polkadot_subsystem::test_helpers::TestSubsystemContextHandle<CandidateBackingMessage>,
}
fn test_harness<T: Future<Output=()>>(keystore: KeyStorePtr, test: impl FnOnce(TestHarness) -> T) {
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool.clone());
let (context, virtual_overseer) = polkadot_subsystem::test_helpers::make_subsystem_context(pool.clone());
let subsystem = CandidateBackingSubsystem::run(context, keystore, pool.clone());
......@@ -959,7 +959,7 @@ mod tests {
// Tests that the subsystem performs actions that are requied on startup.
async fn test_startup(
virtual_overseer: &mut subsystem_test::TestSubsystemContextHandle<CandidateBackingMessage>,
virtual_overseer: &mut polkadot_subsystem::test_helpers::TestSubsystemContextHandle<CandidateBackingMessage>,
test_state: &TestState,
) {
// Start work on some new parent.
......
......@@ -18,6 +18,5 @@ polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsys
[dev-dependencies]
parking_lot = "0.10.0"
subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" }
assert_matches = "1.3.0"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
......@@ -531,7 +531,7 @@ mod tests {
use assert_matches::assert_matches;
use polkadot_subsystem::messages::{StatementDistributionMessage, BitfieldDistributionMessage};
use subsystem_test::{SingleItemSink, SingleItemStream};
use polkadot_subsystem::test_helpers::{SingleItemSink, SingleItemStream};
// The subsystem's view of the network - only supports a single call to `event_stream`.
struct TestNetwork {
......@@ -550,7 +550,7 @@ mod tests {
TestNetwork,
TestNetworkHandle,
) {
let (net_tx, net_rx) = subsystem_test::single_item_sink();
let (net_tx, net_rx) = polkadot_subsystem::test_helpers::single_item_sink();
let (action_tx, action_rx) = mpsc::unbounded();
(
......@@ -631,13 +631,13 @@ mod tests {
struct TestHarness {
network_handle: TestNetworkHandle,
virtual_overseer: subsystem_test::TestSubsystemContextHandle<NetworkBridgeMessage>,
virtual_overseer: polkadot_subsystem::test_helpers::TestSubsystemContextHandle<NetworkBridgeMessage>,
}
fn test_harness<T: Future<Output=()>>(test: impl FnOnce(TestHarness) -> T) {
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (network, network_handle) = new_test_network();
let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool);
let (context, virtual_overseer) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
let network_bridge = run_network(
network,
......
......@@ -18,6 +18,6 @@ polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsys
[dev-dependencies]
parking_lot = "0.10.0"
subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" }
assert_matches = "1.3.0"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem", features = [ "test-helpers" ] }
......@@ -620,7 +620,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
let mut descriptor = CandidateDescriptor::default();
descriptor.pov_hash = pov_hash;
......@@ -700,7 +700,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
let mut descriptor = CandidateDescriptor::default();
descriptor.pov_hash = pov_hash;
......@@ -778,7 +778,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
handle_network_update(
......@@ -850,7 +850,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
// Peer A answers our request before peer B.
......@@ -938,7 +938,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
// Peer A answers our request: right relay parent, awaited hash, wrong PoV.
......@@ -1001,7 +1001,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
// Peer A answers our request: right relay parent, awaited hash, wrong PoV.
......@@ -1062,7 +1062,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
// Peer A answers our request: right relay parent, awaited hash, wrong PoV.
......@@ -1120,7 +1120,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
let max_plausibly_awaited = n_validators * 2;
......@@ -1205,7 +1205,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
let pov_hash = make_pov(vec![1, 2, 3]).hash();
......@@ -1267,7 +1267,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
let pov_hash = make_pov(vec![1, 2, 3]).hash();
......@@ -1344,7 +1344,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
handle_network_update(
......@@ -1427,7 +1427,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
handle_network_update(
......
......@@ -21,7 +21,7 @@ indexmap = "1.4.0"
[dev-dependencies]
parking_lot = "0.10.0"
subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem", features = ["test-helpers"] }
assert_matches = "1.3.0"
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
......@@ -1213,7 +1213,7 @@ mod tests {
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
let peer = PeerId::random();
executor::block_on(async move {
......@@ -1305,7 +1305,7 @@ mod tests {
].into_iter().collect();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
let statement = {
......
......@@ -13,6 +13,7 @@ futures-timer = "3.0.2"
keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "master" }
log = "0.4.8"
parity-scale-codec = "1.3.0"
parking_lot = { version = "0.10.0", optional = true }
pin-project = "0.4.22"
polkadot-node-primitives = { path = "../primitives" }
polkadot-primitives = { path = "../../primitives" }
......@@ -20,3 +21,12 @@ polkadot-statement-table = { path = "../../statement-table" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
streamunordered = "0.5.1"
[dev-dependencies]
assert_matches = "1.3.0"
async-trait = "0.1"
futures = { version = "0.3.5", features = ["thread-pool"] }
parking_lot = "0.10.0"
[features]
test-helpers = [ "parking_lot" ]
......@@ -35,6 +35,8 @@ use crate::messages::AllMessages;
pub mod messages;
pub mod util;
#[cfg(any(test, feature = "test-helpers"))]
pub mod test_helpers;
/// Signals sent by an overseer to a subsystem.
#[derive(PartialEq, Clone, Debug)]
......@@ -71,7 +73,7 @@ pub enum FromOverseer<M> {
/// * Subsystems dying when they are not expected to
/// * Subsystems not dying when they are told to die
/// * etc.
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
pub struct SubsystemError;
impl From<mpsc::SendError> for SubsystemError {
......
......@@ -408,4 +408,10 @@ pub enum AllMessages {
AvailabilityStore(AvailabilityStoreMessage),
/// Message for the network bridge subsystem.
NetworkBridge(NetworkBridgeMessage),
/// Test message
///
/// This variant is only valid while testing, but makes the process of testing the
/// subsystem job manager much simpler.
#[cfg(test)]
Test(String),
}
......@@ -16,8 +16,8 @@
//! Utilities for testing subsystems.
use polkadot_subsystem::{SubsystemContext, FromOverseer, SubsystemResult, SubsystemError};
use polkadot_subsystem::messages::AllMessages;
use crate::{SubsystemContext, FromOverseer, SubsystemResult, SubsystemError};
use crate::messages::AllMessages;
use futures::prelude::*;
use futures::channel::mpsc;
......
......@@ -22,7 +22,7 @@
use crate::{
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, SchedulerRoster},
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
};
use futures::{
channel::{mpsc, oneshot},
......@@ -67,12 +67,20 @@ pub enum Error {
/// Attempted to send on a MPSC channel which has been canceled
#[from]
Mpsc(mpsc::SendError),
/// A subsystem error
#[from]
Subsystem(SubsystemError),
/// The type system wants this even though it doesn't make sense
#[from]
Infallible(std::convert::Infallible),
/// Attempted to convert from an AllMessages to a FromJob, and failed.
SenderConversion(String),
/// The local node is not a validator.
NotAValidator,
/// The desired job is not present in the jobs list.
JobNotFound(Hash),
/// Already forwarding errors to another sender
AlreadyForwarding,
}
/// Request some data from the `RuntimeApi`.
......@@ -262,7 +270,7 @@ pub trait ToJobTrait: TryFrom<AllMessages> {
}
/// A JobHandle manages a particular job for a subsystem.
pub struct JobHandle<ToJob> {
struct JobHandle<ToJob> {
abort_handle: future::AbortHandle,
to_job: mpsc::Sender<ToJob>,
finished: oneshot::Receiver<()>,
......@@ -271,23 +279,23 @@ pub struct JobHandle<ToJob> {
impl<ToJob> JobHandle<ToJob> {
/// Send a message to the job.
pub async fn send_msg(&mut self, msg: ToJob) -> Result<(), Error> {
async fn send_msg(&mut self, msg: ToJob) -> Result<(), Error> {
self.to_job.send(msg).await.map_err(Into::into)
}
/// Abort the job without waiting for a graceful shutdown
pub fn abort(self) {
self.abort_handle.abort();
}
}
impl<ToJob: ToJobTrait> JobHandle<ToJob> {
/// Stop this job gracefully.
///
/// If it hasn't shut itself down after `JOB_GRACEFUL_STOP_DURATION`, abort it.
pub async fn stop(mut self) {
async fn stop(mut self) {
// we don't actually care if the message couldn't be sent
let _ = self.to_job.send(ToJob::STOP).await;
if let Err(_) = self.to_job.send(ToJob::STOP).await {
// no need to wait further here: the job is either stalled or
// disconnected, and in either case, we can just abort it immediately
self.abort_handle.abort();
return;
}
let stop_timer = Delay::new(JOB_GRACEFUL_STOP_DURATION);
match future::select(stop_timer, self.finished).await {
......@@ -310,7 +318,7 @@ pub trait JobTrait: Unpin {
/// Message type from the job. Typically a subset of AllMessages.
type FromJob: 'static + Into<AllMessages> + Send;
/// Job runtime error.
type Error: std::fmt::Debug;
type Error: 'static + std::fmt::Debug + Send;
/// Extra arguments this job needs to run properly.
///
/// If no extra information is needed, it is perfectly acceptable to set it to `()`.
......@@ -323,8 +331,8 @@ pub trait JobTrait: Unpin {
fn run(
parent: Hash,
run_args: Self::RunArgs,
rx_to: mpsc::Receiver<Self::ToJob>,
tx_from: mpsc::Sender<Self::FromJob>,
receiver: mpsc::Receiver<Self::ToJob>,
sender: mpsc::Sender<Self::FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>>;
/// Handle a message which has no relay parent, and therefore can't be dispatched to a particular job
......@@ -342,6 +350,18 @@ pub trait JobTrait: Unpin {
}
}
/// Error which can be returned by the jobs manager
///
/// Wraps the utility error type and the job-specific error
#[derive(Debug, derive_more::From)]
pub enum JobsError<JobError> {
/// utility error
#[from]
Utility(Error),
/// internal job error
Job(JobError),
}
/// Jobs manager for a subsystem
///
/// - Spawns new jobs for a given relay-parent on demand.
......@@ -356,9 +376,10 @@ pub struct Jobs<Spawner, Job: JobTrait> {
#[pin]
outgoing_msgs: StreamUnordered<mpsc::Receiver<Job::FromJob>>,
job: std::marker::PhantomData<Job>,
errors: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
}
impl<Spawner: SpawnNamed, Job: JobTrait> Jobs<Spawner, Job> {
impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
/// Create a new Jobs manager which handles spawning appropriate jobs.
pub fn new(spawner: Spawner) -> Self {
Self {
......@@ -366,15 +387,31 @@ impl<Spawner: SpawnNamed, Job: JobTrait> Jobs<Spawner, Job> {
running: HashMap::new(),
outgoing_msgs: StreamUnordered::new(),
job: std::marker::PhantomData,
errors: None,
}
}
/// Monitor errors which may occur during handling of a spawned job.
///
/// By default, an error in a job is simply logged. Once this is called,
/// the error is forwarded onto the provided channel.
///
/// Errors if the error channel already exists.
pub fn forward_errors(&mut self, tx: mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>) -> Result<(), Error> {
if self.errors.is_some() { return Err(Error::AlreadyForwarding) }
self.errors = Some(tx);
Ok(())
}
/// Spawn a new job for this `parent_hash`, with whatever args are appropriate.
fn spawn_job(&mut self, parent_hash: Hash, run_args: Job::RunArgs) -> Result<(), Error> {
let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
let (finished_tx, finished) = oneshot::channel();
// clone the error transmitter to move into the future
let err_tx = self.errors.clone();
let (future, abort_handle) = future::abortable(async move {
if let Err(e) = Job::run(parent_hash, run_args, to_job_rx, from_job_tx).await {
log::error!(
......@@ -383,12 +420,26 @@ impl<Spawner: SpawnNamed, Job: JobTrait> Jobs<Spawner, Job> {
parent_hash,
e,
);
if let Some(mut err_tx) = err_tx {
// if we can't send the notification of error on the error channel, then
// there's no point trying to propagate this error onto the channel too
// all we can do is warn that error propagatio has failed
if let Err(e) = err_tx.send((Some(parent_hash), JobsError::Job(e))).await {
log::warn!("failed to forward error: {:?}", e);
}
}
}
});
// discard output
// the spawn mechanism requires that the spawned future has no output
let future = async move {
// job errors are already handled within the future, meaning
// that any errors here are due to the abortable mechanism.
// failure to abort isn't of interest.
let _ = future.await;
// transmission failure here is only possible if the receiver is closed,
// which means the handle is dropped, which means we don't care anymore
let _ = finished_tx.send(());
};
self.spawner.spawn(Job::NAME, future.boxed());
......@@ -472,13 +523,14 @@ pub struct JobManager<Spawner, Context, Job: JobTrait> {
run_args: Job::RunArgs,
context: std::marker::PhantomData<Context>,
job: std::marker::PhantomData<Job>,
errors: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
}
impl<Spawner, Context, Job> JobManager<Spawner, Context, Job>
where
Spawner: SpawnNamed + Clone + Send + Unpin,
Context: SubsystemContext,
Job: JobTrait,
Job: 'static + JobTrait,
Job::RunArgs: Clone,
Job::ToJob: TryFrom<AllMessages> + TryFrom<<Context as SubsystemContext>::Message> + Sync,
{
......@@ -489,9 +541,22 @@ where
run_args,
context: std::marker::PhantomData,
job: std::marker::PhantomData,
errors: None,
}
}
/// Monitor errors which may occur during handling of a spawned job.
///
/// By default, an error in a job is simply logged. Once this is called,
/// the error is forwarded onto the provided channel.
///
/// Errors if the error channel already exists.
pub fn forward_errors(&mut self, tx: mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>) -> Result<(), Error> {
if self.errors.is_some() { return Err(Error::AlreadyForwarding) }
self.errors = Some(tx);
Ok(())
}
/// Run this subsystem
///
/// Conceptually, this is very simple: it just loops forever.
......@@ -500,23 +565,41 @@ where
/// - On other incoming messages, if they can be converted into Job::ToJob and
/// include a hash, then they're forwarded to the appropriate individual job.
/// - On outgoing messages from the jobs, it forwards them to the overseer.
pub async fn run(mut ctx: Context, run_args: Job::RunArgs, spawner: Spawner) {
///
/// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur.
/// Otherwise, most are logged and then discarded.
pub async fn run(mut ctx: Context, run_args: Job::RunArgs, spawner: Spawner, mut err_tx: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>) {
let mut jobs = Jobs::new(spawner.clone());
if let Some(ref err_tx) = err_tx {
jobs.forward_errors(err_tx.clone()).expect("we never call this twice in this context; qed");
}
loop {
select! {
incoming = ctx.recv().fuse() => if Self::handle_incoming(incoming, &mut jobs, &run_args).await { break },
outgoing = jobs.next().fuse() => if Self::handle_outgoing(outgoing, &mut ctx).await { break },
incoming = ctx.recv().fuse() => if Self::handle_incoming(incoming, &mut jobs, &run_args, &mut err_tx).await { break },
outgoing = jobs.next().fuse() => if Self::handle_outgoing(outgoing, &mut ctx, &mut err_tx).await { break },
complete => break,
}
}
}
// if we have a channel on which to forward errors, do so
async fn fwd_err(hash: Option<Hash>, err: JobsError<Job::Error>, err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>) {
if let Some(err_tx) = err_tx {
// if we can't send on the error transmission channel, we can't do anything useful about it
// still, we can at least log the failure
if let Err(e) = err_tx.send((hash, err)).await {