From 25239c8642481c61a36c733b6d79a08b27faa00d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Bastian=20K=C3=B6cher?= <bkchr@users.noreply.github.com>
Date: Fri, 19 Nov 2021 21:30:37 +0100
Subject: [PATCH] Taskmanager: Remove `clean_shutdown` (#10314)

There is no reason for this function, tokio already blocks automatically until all tasks are ended.
Another reason to remove this feature is `mpsc_background_tasks` unbounded channel. Recently this
channel was reporting too many unprocessed elements. We assume that this was a result of a lot of
very shot lived tasks that somehow flooded this channel.
---
 substrate/client/cli/src/runner.rs            |   3 +-
 .../client/service/src/task_manager/mod.rs    |  83 +----
 .../client/service/src/task_manager/tests.rs  | 316 +++++++-----------
 substrate/test-utils/test-runner/src/node.rs  |   2 -
 4 files changed, 140 insertions(+), 264 deletions(-)

diff --git a/substrate/client/cli/src/runner.rs b/substrate/client/cli/src/runner.rs
index 6f03e02a12d..640b87584d4 100644
--- a/substrate/client/cli/src/runner.rs
+++ b/substrate/client/cli/src/runner.rs
@@ -98,7 +98,7 @@ where
 	pin_mut!(f);
 
 	tokio_runtime.block_on(main(f))?;
-	tokio_runtime.block_on(task_manager.clean_shutdown());
+	drop(task_manager);
 
 	Ok(())
 }
@@ -154,7 +154,6 @@ impl<C: SubstrateCli> Runner<C> {
 		self.print_node_infos();
 		let mut task_manager = self.tokio_runtime.block_on(initialize(self.config))?;
 		let res = self.tokio_runtime.block_on(main(task_manager.future().fuse()));
-		self.tokio_runtime.block_on(task_manager.clean_shutdown());
 		Ok(res?)
 	}
 
diff --git a/substrate/client/service/src/task_manager/mod.rs b/substrate/client/service/src/task_manager/mod.rs
index 25d3ecd7d5c..342ea6627be 100644
--- a/substrate/client/service/src/task_manager/mod.rs
+++ b/substrate/client/service/src/task_manager/mod.rs
@@ -21,17 +21,16 @@
 use crate::{config::TaskType, Error};
 use exit_future::Signal;
 use futures::{
-	future::{join_all, pending, select, try_join_all, BoxFuture, Either},
+	future::{pending, select, try_join_all, BoxFuture, Either},
 	Future, FutureExt, StreamExt,
 };
-use log::debug;
 use prometheus_endpoint::{
 	exponential_buckets, register, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError,
 	Registry, U64,
 };
 use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
 use std::{panic, pin::Pin, result::Result};
-use tokio::{runtime::Handle, task::JoinHandle};
+use tokio::runtime::Handle;
 use tracing_futures::Instrument;
 
 mod prometheus_future;
@@ -73,7 +72,6 @@ pub struct SpawnTaskHandle {
 	on_exit: exit_future::Exit,
 	tokio_handle: Handle,
 	metrics: Option<Metrics>,
-	task_notifier: TracingUnboundedSender<JoinHandle<()>>,
 }
 
 impl SpawnTaskHandle {
@@ -113,11 +111,6 @@ impl SpawnTaskHandle {
 		task: impl Future<Output = ()> + Send + 'static,
 		task_type: TaskType,
 	) {
-		if self.task_notifier.is_closed() {
-			debug!("Attempt to spawn a new task has been prevented: {}", name);
-			return
-		}
-
 		let on_exit = self.on_exit.clone();
 		let metrics = self.metrics.clone();
 
@@ -169,17 +162,17 @@ impl SpawnTaskHandle {
 		}
 		.in_current_span();
 
-		let join_handle = match task_type {
-			TaskType::Async => self.tokio_handle.spawn(future),
+		match task_type {
+			TaskType::Async => {
+				self.tokio_handle.spawn(future);
+			},
 			TaskType::Blocking => {
 				let handle = self.tokio_handle.clone();
 				self.tokio_handle.spawn_blocking(move || {
 					handle.block_on(future);
-				})
+				});
 			},
-		};
-
-		let _ = self.task_notifier.unbounded_send(join_handle);
+		}
 	}
 }
 
@@ -288,8 +281,8 @@ pub struct TaskManager {
 	/// A future that resolves when the service has exited, this is useful to
 	/// make sure any internally spawned futures stop when the service does.
 	on_exit: exit_future::Exit,
-	/// A signal that makes the exit future above resolve, fired on service drop.
-	signal: Option<Signal>,
+	/// A signal that makes the exit future above resolve, fired on drop.
+	_signal: Signal,
 	/// Tokio runtime handle that is used to spawn futures.
 	tokio_handle: Handle,
 	/// Prometheus metric where to report the polling times.
@@ -301,10 +294,6 @@ pub struct TaskManager {
 	essential_failed_rx: TracingUnboundedReceiver<()>,
 	/// Things to keep alive until the task manager is dropped.
 	keep_alive: Box<dyn std::any::Any + Send>,
-	/// A sender to a stream of background tasks. This is used for the completion future.
-	task_notifier: TracingUnboundedSender<JoinHandle<()>>,
-	/// This future will complete when all the tasks are joined and the stream is closed.
-	completion_future: JoinHandle<()>,
 	/// A list of other `TaskManager`'s to terminate and gracefully shutdown when the parent
 	/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
 	/// task fails.
@@ -325,25 +314,14 @@ impl TaskManager {
 
 		let metrics = prometheus_registry.map(Metrics::register).transpose()?;
 
-		let (task_notifier, background_tasks) = tracing_unbounded("mpsc_background_tasks");
-		// NOTE: for_each_concurrent will await on all the JoinHandle futures at the same time. It
-		// is possible to limit this but it's actually better for the memory foot print to await
-		// them all to not accumulate anything on that stream.
-		let completion_future =
-			tokio_handle.spawn(background_tasks.for_each_concurrent(None, |x| async move {
-				let _ = x.await;
-			}));
-
 		Ok(Self {
 			on_exit,
-			signal: Some(signal),
+			_signal: signal,
 			tokio_handle,
 			metrics,
 			essential_failed_tx,
 			essential_failed_rx,
 			keep_alive: Box::new(()),
-			task_notifier,
-			completion_future,
 			children: Vec::new(),
 		})
 	}
@@ -354,7 +332,6 @@ impl TaskManager {
 			on_exit: self.on_exit.clone(),
 			tokio_handle: self.tokio_handle.clone(),
 			metrics: self.metrics.clone(),
-			task_notifier: self.task_notifier.clone(),
 		}
 	}
 
@@ -363,36 +340,12 @@ impl TaskManager {
 		SpawnEssentialTaskHandle::new(self.essential_failed_tx.clone(), self.spawn_handle())
 	}
 
-	/// Send the signal for termination, prevent new tasks to be created, await for all the existing
-	/// tasks to be finished and drop the object. You can consider this as an async drop.
-	///
-	/// It's always better to call and await this function before exiting the process as background
-	/// tasks may be running in the background. If the process exit and the background tasks are not
-	/// cancelled, this will lead to objects not getting dropped properly.
-	///
-	/// This is an issue in some cases as some of our dependencies do require that we drop all the
-	/// objects properly otherwise it triggers a SIGABRT on exit.
-	pub fn clean_shutdown(mut self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
-		self.terminate();
-		let children_shutdowns = self.children.into_iter().map(|x| x.clean_shutdown());
-		let keep_alive = self.keep_alive;
-		let completion_future = self.completion_future;
-
-		Box::pin(async move {
-			join_all(children_shutdowns).await;
-			let _ = completion_future.await;
-
-			let _ = keep_alive;
-		})
-	}
-
 	/// Return a future that will end with success if the signal to terminate was sent
 	/// (`self.terminate()`) or with an error if an essential task fails.
 	///
 	/// # Warning
 	///
-	/// This function will not wait until the end of the remaining task. You must call and await
-	/// `clean_shutdown()` after this.
+	/// This function will not wait until the end of the remaining task.
 	pub fn future<'a>(
 		&'a mut self,
 	) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>> {
@@ -417,18 +370,6 @@ impl TaskManager {
 		})
 	}
 
-	/// Signal to terminate all the running tasks.
-	pub fn terminate(&mut self) {
-		if let Some(signal) = self.signal.take() {
-			let _ = signal.fire();
-			// NOTE: this will prevent new tasks to be spawned
-			self.task_notifier.close_channel();
-			for child in self.children.iter_mut() {
-				child.terminate();
-			}
-		}
-	}
-
 	/// Set what the task manager should keep alive, can be called multiple times.
 	pub fn keep_alive<T: 'static + Send>(&mut self, to_keep_alive: T) {
 		// allows this fn to safely called multiple times.
diff --git a/substrate/client/service/src/task_manager/tests.rs b/substrate/client/service/src/task_manager/tests.rs
index 75092ff2ae6..f14023f34f6 100644
--- a/substrate/client/service/src/task_manager/tests.rs
+++ b/substrate/client/service/src/task_manager/tests.rs
@@ -90,225 +90,163 @@ fn new_task_manager(tokio_handle: tokio::runtime::Handle) -> TaskManager {
 
 #[test]
 fn ensure_tasks_are_awaited_on_shutdown() {
-	let runtime = tokio::runtime::Runtime::new().unwrap();
-	let handle = runtime.handle().clone();
-
-	let task_manager = new_task_manager(handle);
-	let spawn_handle = task_manager.spawn_handle();
 	let drop_tester = DropTester::new();
-	spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
-	spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
-	assert_eq!(drop_tester, 2);
-	// allow the tasks to even start
-	runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
-	assert_eq!(drop_tester, 2);
-	runtime.block_on(task_manager.clean_shutdown());
+	{
+		let runtime = tokio::runtime::Runtime::new().unwrap();
+		let handle = runtime.handle().clone();
+
+		let task_manager = new_task_manager(handle);
+		let spawn_handle = task_manager.spawn_handle();
+		spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
+		spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
+		assert_eq!(drop_tester, 2);
+		// allow the tasks to even start
+		runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
+		assert_eq!(drop_tester, 2);
+	}
 	drop_tester.wait_on_drop();
 }
 
 #[test]
 fn ensure_keep_alive_during_shutdown() {
-	let runtime = tokio::runtime::Runtime::new().unwrap();
-	let handle = runtime.handle().clone();
-
-	let mut task_manager = new_task_manager(handle);
-	let spawn_handle = task_manager.spawn_handle();
 	let drop_tester = DropTester::new();
-	task_manager.keep_alive(drop_tester.new_ref());
-	spawn_handle.spawn("task1", None, run_background_task(()));
-	assert_eq!(drop_tester, 1);
-	// allow the tasks to even start
-	runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
-	assert_eq!(drop_tester, 1);
-	runtime.block_on(task_manager.clean_shutdown());
+	{
+		let runtime = tokio::runtime::Runtime::new().unwrap();
+		let handle = runtime.handle().clone();
+
+		let mut task_manager = new_task_manager(handle);
+		let spawn_handle = task_manager.spawn_handle();
+		task_manager.keep_alive(drop_tester.new_ref());
+		spawn_handle.spawn("task1", None, run_background_task(()));
+		assert_eq!(drop_tester, 1);
+		// allow the tasks to even start
+		runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
+		assert_eq!(drop_tester, 1);
+	}
 	drop_tester.wait_on_drop();
 }
 
 #[test]
 fn ensure_blocking_futures_are_awaited_on_shutdown() {
-	let runtime = tokio::runtime::Runtime::new().unwrap();
-	let handle = runtime.handle().clone();
-
-	let task_manager = new_task_manager(handle);
-	let spawn_handle = task_manager.spawn_handle();
-	let drop_tester = DropTester::new();
-	spawn_handle.spawn(
-		"task1",
-		None,
-		run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()),
-	);
-	spawn_handle.spawn(
-		"task2",
-		None,
-		run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()),
-	);
-	assert_eq!(drop_tester, 2);
-	// allow the tasks to even start
-	runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
-	assert_eq!(drop_tester, 2);
-	runtime.block_on(task_manager.clean_shutdown());
-	assert_eq!(drop_tester, 0);
-}
-
-#[test]
-fn ensure_no_task_can_be_spawn_after_terminate() {
-	let runtime = tokio::runtime::Runtime::new().unwrap();
-	let handle = runtime.handle().clone();
-
-	let mut task_manager = new_task_manager(handle);
-	let spawn_handle = task_manager.spawn_handle();
-	let drop_tester = DropTester::new();
-	spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
-	spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
-	assert_eq!(drop_tester, 2);
-	// allow the tasks to even start
-	runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
-	assert_eq!(drop_tester, 2);
-	task_manager.terminate();
-	spawn_handle.spawn("task3", None, run_background_task(drop_tester.new_ref()));
-	runtime.block_on(task_manager.clean_shutdown());
-	drop_tester.wait_on_drop();
-}
-
-#[test]
-fn ensure_task_manager_future_ends_when_task_manager_terminated() {
-	let runtime = tokio::runtime::Runtime::new().unwrap();
-	let handle = runtime.handle().clone();
-
-	let mut task_manager = new_task_manager(handle);
-	let spawn_handle = task_manager.spawn_handle();
 	let drop_tester = DropTester::new();
-	spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
-	spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
-	assert_eq!(drop_tester, 2);
-	// allow the tasks to even start
-	runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
-	assert_eq!(drop_tester, 2);
-	task_manager.terminate();
-	runtime.block_on(task_manager.future()).expect("future has ended without error");
-	runtime.block_on(task_manager.clean_shutdown());
+	{
+		let runtime = tokio::runtime::Runtime::new().unwrap();
+		let handle = runtime.handle().clone();
+
+		let task_manager = new_task_manager(handle);
+		let spawn_handle = task_manager.spawn_handle();
+		spawn_handle.spawn(
+			"task1",
+			None,
+			run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()),
+		);
+		spawn_handle.spawn(
+			"task2",
+			None,
+			run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()),
+		);
+		assert_eq!(drop_tester, 2);
+		// allow the tasks to even start
+		runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
+		assert_eq!(drop_tester, 2);
+	}
 	assert_eq!(drop_tester, 0);
 }
 
 #[test]
 fn ensure_task_manager_future_ends_with_error_when_essential_task_fails() {
-	let runtime = tokio::runtime::Runtime::new().unwrap();
-	let handle = runtime.handle().clone();
-
-	let mut task_manager = new_task_manager(handle);
-	let spawn_handle = task_manager.spawn_handle();
-	let spawn_essential_handle = task_manager.spawn_essential_handle();
 	let drop_tester = DropTester::new();
-	spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
-	spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
-	assert_eq!(drop_tester, 2);
-	// allow the tasks to even start
-	runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
-	assert_eq!(drop_tester, 2);
-	spawn_essential_handle.spawn("task3", None, async { panic!("task failed") });
-	runtime
-		.block_on(task_manager.future())
-		.expect_err("future()'s Result must be Err");
-	assert_eq!(drop_tester, 2);
-	runtime.block_on(task_manager.clean_shutdown());
-	drop_tester.wait_on_drop();
-}
-
-#[test]
-fn ensure_children_tasks_ends_when_task_manager_terminated() {
-	let runtime = tokio::runtime::Runtime::new().unwrap();
-	let handle = runtime.handle().clone();
-
-	let mut task_manager = new_task_manager(handle.clone());
-	let child_1 = new_task_manager(handle.clone());
-	let spawn_handle_child_1 = child_1.spawn_handle();
-	let child_2 = new_task_manager(handle.clone());
-	let spawn_handle_child_2 = child_2.spawn_handle();
-	task_manager.add_child(child_1);
-	task_manager.add_child(child_2);
-	let spawn_handle = task_manager.spawn_handle();
-	let drop_tester = DropTester::new();
-	spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
-	spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
-	spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref()));
-	spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref()));
-	assert_eq!(drop_tester, 4);
-	// allow the tasks to even start
-	runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
-	assert_eq!(drop_tester, 4);
-	task_manager.terminate();
-	runtime.block_on(task_manager.future()).expect("future has ended without error");
-	runtime.block_on(task_manager.clean_shutdown());
+	{
+		let runtime = tokio::runtime::Runtime::new().unwrap();
+		let handle = runtime.handle().clone();
+
+		let mut task_manager = new_task_manager(handle);
+		let spawn_handle = task_manager.spawn_handle();
+		let spawn_essential_handle = task_manager.spawn_essential_handle();
+		spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
+		spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
+		assert_eq!(drop_tester, 2);
+		// allow the tasks to even start
+		runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
+		assert_eq!(drop_tester, 2);
+		spawn_essential_handle.spawn("task3", None, async { panic!("task failed") });
+		runtime
+			.block_on(task_manager.future())
+			.expect_err("future()'s Result must be Err");
+		assert_eq!(drop_tester, 2);
+	}
 	drop_tester.wait_on_drop();
 }
 
 #[test]
 fn ensure_task_manager_future_ends_with_error_when_childs_essential_task_fails() {
-	let runtime = tokio::runtime::Runtime::new().unwrap();
-	let handle = runtime.handle().clone();
-
-	let mut task_manager = new_task_manager(handle.clone());
-	let child_1 = new_task_manager(handle.clone());
-	let spawn_handle_child_1 = child_1.spawn_handle();
-	let spawn_essential_handle_child_1 = child_1.spawn_essential_handle();
-	let child_2 = new_task_manager(handle.clone());
-	let spawn_handle_child_2 = child_2.spawn_handle();
-	task_manager.add_child(child_1);
-	task_manager.add_child(child_2);
-	let spawn_handle = task_manager.spawn_handle();
 	let drop_tester = DropTester::new();
-	spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
-	spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
-	spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref()));
-	spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref()));
-	assert_eq!(drop_tester, 4);
-	// allow the tasks to even start
-	runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
-	assert_eq!(drop_tester, 4);
-	spawn_essential_handle_child_1.spawn("task5", None, async { panic!("task failed") });
-	runtime
-		.block_on(task_manager.future())
-		.expect_err("future()'s Result must be Err");
-	assert_eq!(drop_tester, 4);
-	runtime.block_on(task_manager.clean_shutdown());
+	{
+		let runtime = tokio::runtime::Runtime::new().unwrap();
+		let handle = runtime.handle().clone();
+
+		let mut task_manager = new_task_manager(handle.clone());
+		let child_1 = new_task_manager(handle.clone());
+		let spawn_handle_child_1 = child_1.spawn_handle();
+		let spawn_essential_handle_child_1 = child_1.spawn_essential_handle();
+		let child_2 = new_task_manager(handle.clone());
+		let spawn_handle_child_2 = child_2.spawn_handle();
+		task_manager.add_child(child_1);
+		task_manager.add_child(child_2);
+		let spawn_handle = task_manager.spawn_handle();
+		spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
+		spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
+		spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref()));
+		spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref()));
+		assert_eq!(drop_tester, 4);
+		// allow the tasks to even start
+		runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
+		assert_eq!(drop_tester, 4);
+		spawn_essential_handle_child_1.spawn("task5", None, async { panic!("task failed") });
+		runtime
+			.block_on(task_manager.future())
+			.expect_err("future()'s Result must be Err");
+		assert_eq!(drop_tester, 4);
+	}
 	drop_tester.wait_on_drop();
 }
 
 #[test]
 fn ensure_task_manager_future_continues_when_childs_not_essential_task_fails() {
-	let runtime = tokio::runtime::Runtime::new().unwrap();
-	let handle = runtime.handle().clone();
-
-	let mut task_manager = new_task_manager(handle.clone());
-	let child_1 = new_task_manager(handle.clone());
-	let spawn_handle_child_1 = child_1.spawn_handle();
-	let child_2 = new_task_manager(handle.clone());
-	let spawn_handle_child_2 = child_2.spawn_handle();
-	task_manager.add_child(child_1);
-	task_manager.add_child(child_2);
-	let spawn_handle = task_manager.spawn_handle();
 	let drop_tester = DropTester::new();
-	spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
-	spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
-	spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref()));
-	spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref()));
-	assert_eq!(drop_tester, 4);
-	// allow the tasks to even start
-	runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
-	assert_eq!(drop_tester, 4);
-	spawn_handle_child_1.spawn("task5", None, async { panic!("task failed") });
-	runtime.block_on(async {
-		let t1 = task_manager.future().fuse();
-		let t2 = tokio::time::sleep(Duration::from_secs(3)).fuse();
-
-		pin_mut!(t1, t2);
-
-		select! {
-			res = t1 => panic!("task should not have stopped: {:?}", res),
-			_ = t2 => {},
-		}
-	});
-	assert_eq!(drop_tester, 4);
-	runtime.block_on(task_manager.clean_shutdown());
+	{
+		let runtime = tokio::runtime::Runtime::new().unwrap();
+		let handle = runtime.handle().clone();
+
+		let mut task_manager = new_task_manager(handle.clone());
+		let child_1 = new_task_manager(handle.clone());
+		let spawn_handle_child_1 = child_1.spawn_handle();
+		let child_2 = new_task_manager(handle.clone());
+		let spawn_handle_child_2 = child_2.spawn_handle();
+		task_manager.add_child(child_1);
+		task_manager.add_child(child_2);
+		let spawn_handle = task_manager.spawn_handle();
+		spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
+		spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
+		spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref()));
+		spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref()));
+		assert_eq!(drop_tester, 4);
+		// allow the tasks to even start
+		runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
+		assert_eq!(drop_tester, 4);
+		spawn_handle_child_1.spawn("task5", None, async { panic!("task failed") });
+		runtime.block_on(async {
+			let t1 = task_manager.future().fuse();
+			let t2 = tokio::time::sleep(Duration::from_secs(3)).fuse();
+
+			pin_mut!(t1, t2);
+
+			select! {
+				res = t1 => panic!("task should not have stopped: {:?}", res),
+				_ = t2 => {},
+			}
+		});
+		assert_eq!(drop_tester, 4);
+	}
 	drop_tester.wait_on_drop();
 }
diff --git a/substrate/test-utils/test-runner/src/node.rs b/substrate/test-utils/test-runner/src/node.rs
index 07259263c5e..5fd8e4669c3 100644
--- a/substrate/test-utils/test-runner/src/node.rs
+++ b/substrate/test-utils/test-runner/src/node.rs
@@ -261,8 +261,6 @@ where
 			let signal = tokio::signal::ctrl_c();
 			futures::pin_mut!(signal);
 			futures::future::select(task, signal).await;
-			// we don't really care whichever comes first.
-			task_manager.clean_shutdown().await
 		}
 	}
 }
-- 
GitLab