Unverified Commit 26493b44 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

jobs: don't early exit when there are no jobs (#1621)

* jobs: don't early exit when there are no jobs

* utils: fix merged test

* utils: less verbose

* utils: add an assert subsystem is running

* utils: use TimeoutExt from test-helpers

* test-helpers: use TimeoutExt
parent 8e4e79f5
Pipeline #104528 passed with stages
in 28 minutes and 23 seconds
......@@ -275,16 +275,13 @@ pub fn subsystem_test_harness<M, OverseerFactory, Overseer, TestFactory, Test>(
let overseer = overseer_factory(handle);
let test = test_factory(context);
let timeout = Delay::new(Duration::from_secs(2));
futures::pin_mut!(overseer, test, timeout);
futures::pin_mut!(overseer, test);
futures::executor::block_on(async move {
futures::select! {
_ = overseer.fuse() => (),
_ = test.fuse() => (),
_ = timeout.fuse() => panic!("test timed out instead of completing"),
}
future::join(overseer, test)
.timeout(Duration::from_secs(2))
.await
.expect("test timed out instead of completing")
});
}
......
......@@ -643,12 +643,17 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Option<Self::Item>> {
// pin-project the outgoing messages
self.project().outgoing_msgs.poll_next(cx).map(|opt| {
let result = self.project().outgoing_msgs.poll_next(cx).map(|opt| {
opt.and_then(|(stream_yield, _)| match stream_yield {
StreamYield::Item(msg) => Some(msg),
StreamYield::Finished(_) => None,
})
})
});
// we don't want the stream to end if the jobs are empty at some point
match result {
task::Poll::Ready(None) => task::Poll::Pending,
otherwise => otherwise,
}
}
}
......@@ -731,7 +736,7 @@ where
loop {
select! {
incoming = ctx.recv().fuse() => if Self::handle_incoming(incoming, &mut jobs, &run_args, &metrics, &mut err_tx).await { break },
outgoing = jobs.next().fuse() => if Self::handle_outgoing(outgoing, &mut ctx, &mut err_tx).await { break },
outgoing = jobs.next().fuse() => Self::handle_outgoing(outgoing, &mut ctx, &mut err_tx).await,
complete => break,
}
}
......@@ -841,21 +846,16 @@ where
false
}
// handle an outgoing message. return true if we should break afterwards.
// handle an outgoing message.
async fn handle_outgoing(
outgoing: Option<Job::FromJob>,
ctx: &mut Context,
err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
) -> bool {
match outgoing {
Some(msg) => {
if let Err(e) = ctx.send_message(msg.into()).await {
Self::fwd_err(None, Error::from(e).into(), err_tx).await;
}
}
None => return true,
) {
let msg = outgoing.expect("the Jobs stream never ends; qed");
if let Err(e) = ctx.send_message(msg.into()).await {
Self::fwd_err(None, Error::from(e).into(), err_tx).await;
}
false
}
}
......@@ -985,11 +985,10 @@ mod tests {
channel::mpsc,
executor,
stream::{self, StreamExt},
Future, FutureExt, SinkExt,
future, Future, FutureExt, SinkExt,
};
use futures_timer::Delay;
use polkadot_primitives::v1::Hash;
use polkadot_node_subsystem_test_helpers::{self as test_helpers, make_subsystem_context};
use polkadot_node_subsystem_test_helpers::{self as test_helpers, make_subsystem_context, TimeoutExt as _};
use std::{collections::HashMap, convert::TryFrom, pin::Pin, time::Duration};
// basic usage: in a nutshell, when you want to define a subsystem, just focus on what its jobs do;
......@@ -1157,16 +1156,14 @@ mod tests {
let subsystem = FakeCandidateSelectionSubsystem::run(context, run_args, (), pool, Some(err_tx));
let test_future = test(overseer_handle, err_rx);
let timeout = Delay::new(Duration::from_secs(2));
futures::pin_mut!(subsystem, test_future, timeout);
futures::pin_mut!(subsystem, test_future);
executor::block_on(async move {
futures::select! {
_ = test_future.fuse() => (),
_ = subsystem.fuse() => (),
_ = timeout.fuse() => panic!("test timed out instead of completing"),
}
future::join(subsystem, test_future)
.timeout(Duration::from_secs(2))
.await
.expect("test timed out instead of completing")
});
}
......@@ -1195,6 +1192,10 @@ mod tests {
)))
.await;
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
let errs: Vec<_> = err_rx.collect().await;
assert_eq!(errs.len(), 0);
});
......@@ -1224,12 +1225,23 @@ mod tests {
#[test]
fn sending_to_a_non_running_job_do_not_stop_the_subsystem() {
let run_args = HashMap::new();
let relay_parent = Hash::repeat_byte(0x01);
let mut run_args = HashMap::new();
run_args.insert(
relay_parent.clone(),
vec![FromJob::Test],
);
test_harness(run_args, |mut overseer_handle, err_rx| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(relay_parent),
)))
.await;
// send to a non running job
overseer_handle
.send(FromOverseer::Communication {
.send(FromOverseer::Communication {
msg: Default::default(),
})
.await;
......@@ -1240,6 +1252,10 @@ mod tests {
AllMessages::CandidateSelection(_)
);
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
let errs: Vec<_> = err_rx.collect().await;
assert_eq!(errs.len(), 0);
});
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment