Unverified Commit d19ca7ce authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Parachain improvements (#1905)



* Parachain improvements

- Set the parachains configuration in Rococo genesis
- Don't stop the overseer when a subsystem job is stopped
- Several small code changes

* Remove unused functionality

* Return error from the runtime instead of printing it

* Apply suggestions from code review

Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* Update primitives/src/v1.rs

Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* Update primitives/src/v1.rs

Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* Fix test

* Revert "Update primitives/src/v1.rs"

This reverts commit 11fce278.

* Revert "Update primitives/src/v1.rs"

This reverts commit d6439fed.

* Revert "Return error from the runtime instead of printing it"

This reverts commit cb4b5c08.

* Revert "Fix test"

This reverts commit 0c5fa1b5

.

* Update runtime/parachains/src/runtime_api_impl/v1.rs

Co-authored-by: Sergey Pepyakin's avatarSergei Shulepov <sergei@parity.io>

Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
Co-authored-by: Sergey Pepyakin's avatarSergei Shulepov <sergei@parity.io>
parent 6949243d
Pipeline #113049 passed with stages
in 24 minutes and 53 seconds
...@@ -5398,7 +5398,7 @@ dependencies = [ ...@@ -5398,7 +5398,7 @@ dependencies = [
"futures 0.3.5", "futures 0.3.5",
"hex-literal 0.2.1", "hex-literal 0.2.1",
"libsecp256k1", "libsecp256k1",
"log 0.3.9", "log 0.4.11",
"pallet-authority-discovery", "pallet-authority-discovery",
"pallet-authorship", "pallet-authorship",
"pallet-babe", "pallet-babe",
...@@ -5418,7 +5418,6 @@ dependencies = [ ...@@ -5418,7 +5418,6 @@ dependencies = [
"rustc-hex", "rustc-hex",
"sc-keystore", "sc-keystore",
"serde", "serde",
"serde_derive",
"serde_json", "serde_json",
"sp-api", "sp-api",
"sp-application-crypto", "sp-application-crypto",
...@@ -5475,6 +5474,7 @@ dependencies = [ ...@@ -5475,6 +5474,7 @@ dependencies = [
"polkadot-primitives", "polkadot-primitives",
"polkadot-rpc", "polkadot-rpc",
"polkadot-runtime", "polkadot-runtime",
"polkadot-runtime-parachains",
"polkadot-statement-distribution", "polkadot-statement-distribution",
"polkadot-test-client", "polkadot-test-client",
"rococo-runtime", "rococo-runtime",
......
...@@ -288,7 +288,7 @@ impl CandidateBackingJob { ...@@ -288,7 +288,7 @@ impl CandidateBackingJob {
ToJob::CandidateBacking(msg) => { ToJob::CandidateBacking(msg) => {
self.process_msg(msg).await?; self.process_msg(msg).await?;
} }
_ => break, ToJob::Stop => break,
} }
} }
...@@ -838,7 +838,7 @@ impl util::JobTrait for CandidateBackingJob { ...@@ -838,7 +838,7 @@ impl util::JobTrait for CandidateBackingJob {
let (assignment, required_collator) = match assignment { let (assignment, required_collator) = match assignment {
None => return Ok(()), // no need to work. None => return Ok(()), // no need to work.
Some((a, r)) => (a, r), Some(r) => r,
}; };
let job = CandidateBackingJob { let job = CandidateBackingJob {
......
...@@ -323,10 +323,7 @@ async fn candidate_is_valid( ...@@ -323,10 +323,7 @@ async fn candidate_is_valid(
pov: Arc<PoV>, pov: Arc<PoV>,
sender: mpsc::Sender<FromJob>, sender: mpsc::Sender<FromJob>,
) -> bool { ) -> bool {
std::matches!( candidate_is_valid_inner(candidate_descriptor, pov, sender).await.unwrap_or(false)
candidate_is_valid_inner(candidate_descriptor, pov, sender).await,
Ok(true)
)
} }
// find out whether a candidate is valid or not, with a worse interface // find out whether a candidate is valid or not, with a worse interface
...@@ -342,7 +339,7 @@ async fn candidate_is_valid_inner( ...@@ -342,7 +339,7 @@ async fn candidate_is_valid_inner(
CandidateValidationMessage::ValidateFromChainState(candidate_descriptor, pov, tx), CandidateValidationMessage::ValidateFromChainState(candidate_descriptor, pov, tx),
)) ))
.await?; .await?;
Ok(std::matches!( Ok(matches!(
rx.await, rx.await,
Ok(Ok(ValidationResult::Valid(_, _))) Ok(Ok(ValidationResult::Valid(_, _)))
)) ))
......
...@@ -294,11 +294,11 @@ async fn find_assumed_validation_data( ...@@ -294,11 +294,11 @@ async fn find_assumed_validation_data(
for assumption in ASSUMPTIONS { for assumption in ASSUMPTIONS {
let outcome = check_assumption_validation_data(ctx, descriptor, *assumption).await?; let outcome = check_assumption_validation_data(ctx, descriptor, *assumption).await?;
let () = match outcome { match outcome {
AssumptionCheckOutcome::Matches(_, _) => return Ok(outcome), AssumptionCheckOutcome::Matches(_, _) => return Ok(outcome),
AssumptionCheckOutcome::BadRequest => return Ok(outcome), AssumptionCheckOutcome::BadRequest => return Ok(outcome),
AssumptionCheckOutcome::DoesNotMatch => continue, AssumptionCheckOutcome::DoesNotMatch => continue,
}; }
} }
Ok(AssumptionCheckOutcome::DoesNotMatch) Ok(AssumptionCheckOutcome::DoesNotMatch)
......
...@@ -18,9 +18,7 @@ use std::collections::HashMap; ...@@ -18,9 +18,7 @@ use std::collections::HashMap;
use super::{TARGET, Result}; use super::{TARGET, Result};
use futures::channel::oneshot; use futures::{StreamExt, channel::oneshot, task::Poll};
use futures::stream::StreamExt as _;
use futures::task::Poll;
use log::warn; use log::warn;
use polkadot_primitives::v1::{ use polkadot_primitives::v1::{
......
...@@ -20,6 +20,7 @@ use std::task::Poll; ...@@ -20,6 +20,7 @@ use std::task::Poll;
use futures::{ use futures::{
StreamExt, StreamExt,
FutureExt,
channel::oneshot, channel::oneshot,
future::BoxFuture, future::BoxFuture,
stream::FuturesUnordered, stream::FuturesUnordered,
...@@ -122,7 +123,6 @@ impl CollationRequest { ...@@ -122,7 +123,6 @@ impl CollationRequest {
request_id, request_id,
} = self; } = self;
match received.timeout(timeout).await { match received.timeout(timeout).await {
None => Timeout(request_id), None => Timeout(request_id),
Some(_) => Received(request_id), Some(_) => Received(request_id),
...@@ -417,9 +417,7 @@ where ...@@ -417,9 +417,7 @@ where
state.requests_info.insert(request_id, per_request); state.requests_info.insert(request_id, per_request);
state.requests_in_progress.push(Box::pin(async move { state.requests_in_progress.push(request.wait().boxed());
request.wait().await
}));
let wire_message = protocol_v1::CollatorProtocolMessage::RequestCollation( let wire_message = protocol_v1::CollatorProtocolMessage::RequestCollation(
request_id, request_id,
......
...@@ -1608,6 +1608,8 @@ fn spawn<S: SpawnNamed, M: Send + 'static>( ...@@ -1608,6 +1608,8 @@ fn spawn<S: SpawnNamed, M: Send + 'static>(
let fut = Box::pin(async move { let fut = Box::pin(async move {
if let Err(e) = future.await { if let Err(e) = future.await {
log::error!("Subsystem {} exited with error {:?}", name, e); log::error!("Subsystem {} exited with error {:?}", name, e);
} else {
log::debug!("Subsystem {} exited without an error", name);
} }
let _ = tx.send(()); let _ = tx.send(());
}); });
......
...@@ -68,6 +68,7 @@ polkadot-primitives = { path = "../../primitives" } ...@@ -68,6 +68,7 @@ polkadot-primitives = { path = "../../primitives" }
polkadot-rpc = { path = "../../rpc" } polkadot-rpc = { path = "../../rpc" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" }
polkadot-node-subsystem-util = { path = "../subsystem-util" } polkadot-node-subsystem-util = { path = "../subsystem-util" }
polkadot-runtime-parachains = { path = "../../runtime/parachains" }
# Polkadot Runtimes # Polkadot Runtimes
polkadot-runtime = { path = "../../runtime/polkadot" } polkadot-runtime = { path = "../../runtime/polkadot" }
......
...@@ -771,6 +771,17 @@ fn rococo_staging_testnet_config_genesis(wasm_binary: &[u8]) -> rococo_runtime:: ...@@ -771,6 +771,17 @@ fn rococo_staging_testnet_config_genesis(wasm_binary: &[u8]) -> rococo_runtime::
pallet_sudo: Some(rococo_runtime::SudoConfig { pallet_sudo: Some(rococo_runtime::SudoConfig {
key: endowed_accounts[0].clone(), key: endowed_accounts[0].clone(),
}), }),
parachains_configuration: Some(rococo_runtime::ParachainConfigConfig {
config: polkadot_runtime_parachains::configuration::HostConfiguration {
validation_upgrade_frequency: 600u32,
validation_upgrade_delay: 300,
acceptance_period: 1200,
max_code_size: 5 * 1024 * 1024,
max_head_data_size: 32 * 1024,
group_rotation_frequency: 10,
..Default::default()
},
}),
} }
} }
...@@ -1212,6 +1223,17 @@ pub fn rococo_testnet_genesis( ...@@ -1212,6 +1223,17 @@ pub fn rococo_testnet_genesis(
}), }),
pallet_staking: Some(Default::default()), pallet_staking: Some(Default::default()),
pallet_sudo: Some(rococo_runtime::SudoConfig { key: root_key }), pallet_sudo: Some(rococo_runtime::SudoConfig { key: root_key }),
parachains_configuration: Some(rococo_runtime::ParachainConfigConfig {
config: polkadot_runtime_parachains::configuration::HostConfiguration {
validation_upgrade_frequency: 600u32,
validation_upgrade_delay: 300,
acceptance_period: 1200,
max_code_size: 5 * 1024 * 1024,
max_head_data_size: 32 * 1024,
group_rotation_frequency: 10,
..Default::default()
},
}),
} }
} }
......
...@@ -22,9 +22,6 @@ ...@@ -22,9 +22,6 @@
//! //!
//! This crate also reexports Prometheus metric types which are expected to be implemented by subsystems. //! This crate also reexports Prometheus metric types which are expected to be implemented by subsystems.
#![deny(unused_results)]
// #![deny(unused_crate_dependencies] causes false positives
// https://github.com/rust-lang/rust/issues/57274
#![warn(missing_docs)] #![warn(missing_docs)]
use polkadot_node_subsystem::{ use polkadot_node_subsystem::{
...@@ -32,17 +29,10 @@ use polkadot_node_subsystem::{ ...@@ -32,17 +29,10 @@ use polkadot_node_subsystem::{
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender}, messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult, FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
}; };
use futures::{ use futures::{channel::{mpsc, oneshot}, prelude::*, select, stream::Stream};
channel::{mpsc, oneshot},
future::Either,
prelude::*,
select,
stream::Stream,
task,
};
use futures_timer::Delay; use futures_timer::Delay;
use parity_scale_codec::Encode; use parity_scale_codec::Encode;
use pin_project::{pin_project, pinned_drop}; use pin_project::pin_project;
use polkadot_primitives::v1::{ use polkadot_primitives::v1::{
CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, PersistedValidationData, CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, PersistedValidationData,
GroupRotationInfo, Hash, Id as ParaId, ValidationData, OccupiedCoreAssumption, GroupRotationInfo, Hash, Id as ParaId, ValidationData, OccupiedCoreAssumption,
...@@ -59,7 +49,7 @@ use sp_keystore::{ ...@@ -59,7 +49,7 @@ use sp_keystore::{
Error as KeystoreError, Error as KeystoreError,
}; };
use std::{ use std::{
collections::HashMap, collections::{HashMap, hash_map::Entry},
convert::{TryFrom, TryInto}, convert::{TryFrom, TryInto},
marker::Unpin, marker::Unpin,
pin::Pin, pin::Pin,
...@@ -81,7 +71,6 @@ pub mod reexports { ...@@ -81,7 +71,6 @@ pub mod reexports {
}; };
} }
/// Duration a job will wait after sending a stop signal before hard-aborting. /// Duration a job will wait after sending a stop signal before hard-aborting.
pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1); pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
/// Capacity of channels to and from individual jobs /// Capacity of channels to and from individual jobs
...@@ -111,9 +100,6 @@ pub enum Error { ...@@ -111,9 +100,6 @@ pub enum Error {
/// The local node is not a validator. /// The local node is not a validator.
#[error("Node is not a validator")] #[error("Node is not a validator")]
NotAValidator, NotAValidator,
/// The desired job is not present in the jobs list.
#[error("Relay parent {0} not of interest")]
JobNotFound(Hash),
/// Already forwarding errors to another sender /// Already forwarding errors to another sender
#[error("AlreadyForwarding")] #[error("AlreadyForwarding")]
AlreadyForwarding, AlreadyForwarding,
...@@ -415,12 +401,19 @@ pub trait ToJobTrait: TryFrom<AllMessages> { ...@@ -415,12 +401,19 @@ pub trait ToJobTrait: TryFrom<AllMessages> {
fn relay_parent(&self) -> Option<Hash>; fn relay_parent(&self) -> Option<Hash>;
} }
struct AbortOnDrop(future::AbortHandle);
impl Drop for AbortOnDrop {
fn drop(&mut self) {
self.0.abort();
}
}
/// A JobHandle manages a particular job for a subsystem. /// A JobHandle manages a particular job for a subsystem.
struct JobHandle<ToJob> { struct JobHandle<ToJob> {
abort_handle: future::AbortHandle, _abort_handle: AbortOnDrop,
to_job: mpsc::Sender<ToJob>, to_job: mpsc::Sender<ToJob>,
finished: oneshot::Receiver<()>, finished: oneshot::Receiver<()>,
outgoing_msgs_handle: usize,
} }
impl<ToJob> JobHandle<ToJob> { impl<ToJob> JobHandle<ToJob> {
...@@ -436,20 +429,13 @@ impl<ToJob: ToJobTrait> JobHandle<ToJob> { ...@@ -436,20 +429,13 @@ impl<ToJob: ToJobTrait> JobHandle<ToJob> {
/// If it hasn't shut itself down after `JOB_GRACEFUL_STOP_DURATION`, abort it. /// If it hasn't shut itself down after `JOB_GRACEFUL_STOP_DURATION`, abort it.
async fn stop(mut self) { async fn stop(mut self) {
// we don't actually care if the message couldn't be sent // we don't actually care if the message couldn't be sent
if let Err(_) = self.to_job.send(ToJob::STOP).await { if self.to_job.send(ToJob::STOP).await.is_err() {
// no need to wait further here: the job is either stalled or
// disconnected, and in either case, we can just abort it immediately
self.abort_handle.abort();
return; return;
} }
let stop_timer = Delay::new(JOB_GRACEFUL_STOP_DURATION); let stop_timer = Delay::new(JOB_GRACEFUL_STOP_DURATION);
match future::select(stop_timer, self.finished).await { future::select(stop_timer, self.finished).await;
Either::Left((_, _)) => {}
Either::Right((_, _)) => {
self.abort_handle.abort();
}
}
} }
} }
...@@ -521,20 +507,6 @@ pub trait JobTrait: Unpin { ...@@ -521,20 +507,6 @@ pub trait JobTrait: Unpin {
receiver: mpsc::Receiver<Self::ToJob>, receiver: mpsc::Receiver<Self::ToJob>,
sender: mpsc::Sender<Self::FromJob>, sender: mpsc::Sender<Self::FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>>; ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>>;
/// Handle a message which has no relay parent, and therefore can't be dispatched to a particular job
///
/// By default, this is implemented with a NOP function. However, if
/// ToJob occasionally has messages which do not correspond to a particular
/// parent relay hash, then this function will be spawned as a one-off
/// task to handle those messages.
// TODO: the API here is likely not precisely what we want; figure it out more
// once we're implementing a subsystem which actually needs this feature.
// In particular, we're quite likely to want this to return a future instead of
// interrupting the active thread for the duration of the handler.
fn handle_unanchored_msg(_msg: Self::ToJob) -> Result<(), Self::Error> {
Ok(())
}
} }
/// Error which can be returned by the jobs manager /// Error which can be returned by the jobs manager
...@@ -557,12 +529,12 @@ pub enum JobsError<JobError: 'static + std::error::Error> { ...@@ -557,12 +529,12 @@ pub enum JobsError<JobError: 'static + std::error::Error> {
/// - Dispatches messages to the appropriate job for a given relay-parent. /// - Dispatches messages to the appropriate job for a given relay-parent.
/// - When dropped, aborts all remaining jobs. /// - When dropped, aborts all remaining jobs.
/// - implements `Stream<Item=Job::FromJob>`, collecting all messages from subordinate jobs. /// - implements `Stream<Item=Job::FromJob>`, collecting all messages from subordinate jobs.
#[pin_project(PinnedDrop)] #[pin_project]
pub struct Jobs<Spawner, Job: JobTrait> { pub struct Jobs<Spawner, Job: JobTrait> {
spawner: Spawner, spawner: Spawner,
running: HashMap<Hash, JobHandle<Job::ToJob>>, running: HashMap<Hash, JobHandle<Job::ToJob>>,
#[pin]
outgoing_msgs: StreamUnordered<mpsc::Receiver<Job::FromJob>>, outgoing_msgs: StreamUnordered<mpsc::Receiver<Job::FromJob>>,
#[pin]
job: std::marker::PhantomData<Job>, job: std::marker::PhantomData<Job>,
errors: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>, errors: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
} }
...@@ -602,7 +574,6 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> { ...@@ -602,7 +574,6 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY); let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
let (finished_tx, finished) = oneshot::channel(); let (finished_tx, finished) = oneshot::channel();
// clone the error transmitter to move into the future
let err_tx = self.errors.clone(); let err_tx = self.errors.clone();
let (future, abort_handle) = future::abortable(async move { let (future, abort_handle) = future::abortable(async move {
...@@ -625,7 +596,6 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> { ...@@ -625,7 +596,6 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
} }
}); });
// the spawn mechanism requires that the spawned future has no output
let future = async move { let future = async move {
// job errors are already handled within the future, meaning // job errors are already handled within the future, meaning
// that any errors here are due to the abortable mechanism. // that any errors here are due to the abortable mechanism.
...@@ -637,54 +607,33 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> { ...@@ -637,54 +607,33 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
}; };
self.spawner.spawn(Job::NAME, future.boxed()); self.spawner.spawn(Job::NAME, future.boxed());
// this handle lets us remove the appropriate receiver from self.outgoing_msgs self.outgoing_msgs.push(from_job_rx);
// when it's time to stop the job.
let outgoing_msgs_handle = self.outgoing_msgs.push(from_job_rx);
let handle = JobHandle { let handle = JobHandle {
abort_handle, _abort_handle: AbortOnDrop(abort_handle),
to_job: to_job_tx, to_job: to_job_tx,
finished, finished,
outgoing_msgs_handle,
}; };
let _ = self.running.insert(parent_hash, handle); self.running.insert(parent_hash, handle);
Ok(()) Ok(())
} }
/// Stop the job associated with this `parent_hash`. /// Stop the job associated with this `parent_hash`.
pub async fn stop_job(&mut self, parent_hash: Hash) -> Result<(), Error> { pub async fn stop_job(&mut self, parent_hash: Hash) {
match self.running.remove(&parent_hash) { if let Some(handle) = self.running.remove(&parent_hash) {
Some(handle) => { handle.stop().await;
let _ = Pin::new(&mut self.outgoing_msgs).remove(handle.outgoing_msgs_handle);
handle.stop().await;
Ok(())
}
None => Err(Error::JobNotFound(parent_hash)),
} }
} }
/// Send a message to the appropriate job for this `parent_hash`. /// Send a message to the appropriate job for this `parent_hash`.
/// Will not return an error if the job is not running. async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) {
async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) -> Result<(), Error> { if let Entry::Occupied(mut job) = self.running.entry(parent_hash) {
match self.running.get_mut(&parent_hash) { if job.get_mut().send_msg(msg).await.is_err() {
Some(job) => job.send_msg(msg).await?, log::debug!("failed to send message to job ({}), will remove it", Job::NAME);
None => { job.remove();
// don't bring down the subsystem, this can happen to due a race condition }
},
}
Ok(())
}
}
// Note that on drop, we don't have the chance to gracefully spin down each of the remaining handles;
// we just abort them all. Still better than letting them dangle.
#[pinned_drop]
impl<Spawner, Job: JobTrait> PinnedDrop for Jobs<Spawner, Job> {
fn drop(self: Pin<&mut Self>) {
for job_handle in self.running.values() {
job_handle.abort_handle.abort();
} }
} }
} }
...@@ -696,18 +645,18 @@ where ...@@ -696,18 +645,18 @@ where
{ {
type Item = Job::FromJob; type Item = Job::FromJob;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// pin-project the outgoing messages loop {
let result = self.project().outgoing_msgs.poll_next(cx).map(|opt| { match Pin::new(&mut self.outgoing_msgs).poll_next(cx) {
opt.and_then(|(stream_yield, _)| match stream_yield { Poll::Pending => return Poll::Pending,
StreamYield::Item(msg) => Some(msg), Poll::Ready(r) => match r.map(|v| v.0) {
StreamYield::Finished(_) => None, Some(StreamYield::Item(msg)) => return Poll::Ready(Some(msg)),
}) // If a job is finished, rerun the loop
}); Some(StreamYield::Finished(_)) => continue,
// we don't want the stream to end if the jobs are empty at some point // Don't end if there are no jobs running
match result { None => return Poll::Pending,
task::Poll::Ready(None) => task::Poll::Pending, }
otherwise => otherwise, }
} }
} }
} }
...@@ -790,8 +739,18 @@ where ...@@ -790,8 +739,18 @@ where
loop { loop {
select! { select! {
incoming = ctx.recv().fuse() => if Self::handle_incoming(incoming, &mut jobs, &run_args, &metrics, &mut err_tx).await { break }, incoming = ctx.recv().fuse() =>
outgoing = jobs.next().fuse() => Self::handle_outgoing(outgoing, &mut ctx, &mut err_tx).await, if Self::handle_incoming(
incoming,
&mut jobs,
&run_args,
&metrics,
&mut err_tx,
).await {
break
},
outgoing = jobs.next().fuse() =>