Unverified Commit 8e4e79f5 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

utils: handle race condition gracefully (#1583)

* utils: handle race condition gracefully

* utils: add a test

* update Cargo.lock

* utils: remove a warning

* utils: init logger in tests

* utils: update the outdated comment

* util: wait for both subsystem and test_future to finish

* Revert "util: wait for both subsystem and test_future to finish"

This reverts commit 075b3924.
parent bf7ccb84
Pipeline #104468 passed with stages
in 27 minutes and 20 seconds
...@@ -4891,6 +4891,7 @@ dependencies = [ ...@@ -4891,6 +4891,7 @@ dependencies = [
"assert_matches", "assert_matches",
"async-trait", "async-trait",
"derive_more 0.99.9", "derive_more 0.99.9",
"env_logger",
"futures 0.3.5", "futures 0.3.5",
"futures-timer 3.0.2", "futures-timer 3.0.2",
"log 0.4.11", "log 0.4.11",
......
...@@ -30,3 +30,4 @@ async-trait = "0.1" ...@@ -30,3 +30,4 @@ async-trait = "0.1"
futures = { version = "0.3.5", features = ["thread-pool"] } futures = { version = "0.3.5", features = ["thread-pool"] }
parking_lot = "0.10.0" parking_lot = "0.10.0"
polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" } polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" }
env_logger = "0.7.1"
...@@ -611,10 +611,13 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> { ...@@ -611,10 +611,13 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
} }
/// Send a message to the appropriate job for this `parent_hash`. /// Send a message to the appropriate job for this `parent_hash`.
/// Will not return an error if the job is not running.
async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) -> Result<(), Error> { async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) -> Result<(), Error> {
match self.running.get_mut(&parent_hash) { match self.running.get_mut(&parent_hash) {
Some(job) => job.send_msg(msg).await?, Some(job) => job.send_msg(msg).await?,
None => return Err(Error::JobNotFound(parent_hash)), None => {
// don't bring down the subsystem, this can happen to due a race condition
},
} }
Ok(()) Ok(())
} }
...@@ -1140,6 +1143,14 @@ mod tests { ...@@ -1140,6 +1143,14 @@ mod tests {
run_args: HashMap<Hash, Vec<FromJob>>, run_args: HashMap<Hash, Vec<FromJob>>,
test: impl FnOnce(OverseerHandle, mpsc::Receiver<(Option<Hash>, JobsError<Error>)>) -> T, test: impl FnOnce(OverseerHandle, mpsc::Receiver<(Option<Hash>, JobsError<Error>)>) -> T,
) { ) {
let _ = env_logger::builder()
.is_test(true)
.filter(
None,
log::LevelFilter::Trace,
)
.try_init();
let pool = sp_core::testing::TaskExecutor::new(); let pool = sp_core::testing::TaskExecutor::new();
let (context, overseer_handle) = make_subsystem_context(pool.clone()); let (context, overseer_handle) = make_subsystem_context(pool.clone());
let (err_tx, err_rx) = mpsc::channel(16); let (err_tx, err_rx) = mpsc::channel(16);
...@@ -1148,9 +1159,7 @@ mod tests { ...@@ -1148,9 +1159,7 @@ mod tests {
let test_future = test(overseer_handle, err_rx); let test_future = test(overseer_handle, err_rx);
let timeout = Delay::new(Duration::from_secs(2)); let timeout = Delay::new(Duration::from_secs(2));
futures::pin_mut!(test_future); futures::pin_mut!(subsystem, test_future, timeout);
futures::pin_mut!(subsystem);
futures::pin_mut!(timeout);
executor::block_on(async move { executor::block_on(async move {
futures::select! { futures::select! {
...@@ -1213,6 +1222,29 @@ mod tests { ...@@ -1213,6 +1222,29 @@ mod tests {
}); });
} }
#[test]
fn sending_to_a_non_running_job_do_not_stop_the_subsystem() {
let run_args = HashMap::new();
test_harness(run_args, |mut overseer_handle, err_rx| async move {
// send to a non running job
overseer_handle
.send(FromOverseer::Communication {
msg: Default::default(),
})
.await;
// the subsystem is still alive
assert_matches!(
overseer_handle.recv().await,
AllMessages::CandidateSelection(_)
);
let errs: Vec<_> = err_rx.collect().await;
assert_eq!(errs.len(), 0);
});
}
#[test] #[test]
fn test_subsystem_impl_and_name_derivation() { fn test_subsystem_impl_and_name_derivation() {
let pool = sp_core::testing::TaskExecutor::new(); let pool = sp_core::testing::TaskExecutor::new();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment