diff --git a/substrate/client/cli/src/runner.rs b/substrate/client/cli/src/runner.rs index 6f03e02a12d0560f90cef5eceb869b591cc2c196..640b87584d4b61f426956e7eafd5ccbb34fc9b99 100644 --- a/substrate/client/cli/src/runner.rs +++ b/substrate/client/cli/src/runner.rs @@ -98,7 +98,7 @@ where pin_mut!(f); tokio_runtime.block_on(main(f))?; - tokio_runtime.block_on(task_manager.clean_shutdown()); + drop(task_manager); Ok(()) } @@ -154,7 +154,6 @@ impl<C: SubstrateCli> Runner<C> { self.print_node_infos(); let mut task_manager = self.tokio_runtime.block_on(initialize(self.config))?; let res = self.tokio_runtime.block_on(main(task_manager.future().fuse())); - self.tokio_runtime.block_on(task_manager.clean_shutdown()); Ok(res?) } diff --git a/substrate/client/service/src/task_manager/mod.rs b/substrate/client/service/src/task_manager/mod.rs index 25d3ecd7d5c3c0e3712af54778926b8d3c85fe3a..342ea6627be6858a68370179e1baee755076c287 100644 --- a/substrate/client/service/src/task_manager/mod.rs +++ b/substrate/client/service/src/task_manager/mod.rs @@ -21,17 +21,16 @@ use crate::{config::TaskType, Error}; use exit_future::Signal; use futures::{ - future::{join_all, pending, select, try_join_all, BoxFuture, Either}, + future::{pending, select, try_join_all, BoxFuture, Either}, Future, FutureExt, StreamExt, }; -use log::debug; use prometheus_endpoint::{ exponential_buckets, register, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry, U64, }; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use std::{panic, pin::Pin, result::Result}; -use tokio::{runtime::Handle, task::JoinHandle}; +use tokio::runtime::Handle; use tracing_futures::Instrument; mod prometheus_future; @@ -73,7 +72,6 @@ pub struct SpawnTaskHandle { on_exit: exit_future::Exit, tokio_handle: Handle, metrics: Option<Metrics>, - task_notifier: TracingUnboundedSender<JoinHandle<()>>, } impl SpawnTaskHandle { @@ -113,11 +111,6 @@ impl SpawnTaskHandle { task: impl Future<Output = ()> + Send + 'static, task_type: TaskType, ) { - if self.task_notifier.is_closed() { - debug!("Attempt to spawn a new task has been prevented: {}", name); - return - } - let on_exit = self.on_exit.clone(); let metrics = self.metrics.clone(); @@ -169,17 +162,17 @@ impl SpawnTaskHandle { } .in_current_span(); - let join_handle = match task_type { - TaskType::Async => self.tokio_handle.spawn(future), + match task_type { + TaskType::Async => { + self.tokio_handle.spawn(future); + }, TaskType::Blocking => { let handle = self.tokio_handle.clone(); self.tokio_handle.spawn_blocking(move || { handle.block_on(future); - }) + }); }, - }; - - let _ = self.task_notifier.unbounded_send(join_handle); + } } } @@ -288,8 +281,8 @@ pub struct TaskManager { /// A future that resolves when the service has exited, this is useful to /// make sure any internally spawned futures stop when the service does. on_exit: exit_future::Exit, - /// A signal that makes the exit future above resolve, fired on service drop. - signal: Option<Signal>, + /// A signal that makes the exit future above resolve, fired on drop. + _signal: Signal, /// Tokio runtime handle that is used to spawn futures. tokio_handle: Handle, /// Prometheus metric where to report the polling times. @@ -301,10 +294,6 @@ pub struct TaskManager { essential_failed_rx: TracingUnboundedReceiver<()>, /// Things to keep alive until the task manager is dropped. keep_alive: Box<dyn std::any::Any + Send>, - /// A sender to a stream of background tasks. This is used for the completion future. - task_notifier: TracingUnboundedSender<JoinHandle<()>>, - /// This future will complete when all the tasks are joined and the stream is closed. - completion_future: JoinHandle<()>, /// A list of other `TaskManager`'s to terminate and gracefully shutdown when the parent /// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential /// task fails. @@ -325,25 +314,14 @@ impl TaskManager { let metrics = prometheus_registry.map(Metrics::register).transpose()?; - let (task_notifier, background_tasks) = tracing_unbounded("mpsc_background_tasks"); - // NOTE: for_each_concurrent will await on all the JoinHandle futures at the same time. It - // is possible to limit this but it's actually better for the memory foot print to await - // them all to not accumulate anything on that stream. - let completion_future = - tokio_handle.spawn(background_tasks.for_each_concurrent(None, |x| async move { - let _ = x.await; - })); - Ok(Self { on_exit, - signal: Some(signal), + _signal: signal, tokio_handle, metrics, essential_failed_tx, essential_failed_rx, keep_alive: Box::new(()), - task_notifier, - completion_future, children: Vec::new(), }) } @@ -354,7 +332,6 @@ impl TaskManager { on_exit: self.on_exit.clone(), tokio_handle: self.tokio_handle.clone(), metrics: self.metrics.clone(), - task_notifier: self.task_notifier.clone(), } } @@ -363,36 +340,12 @@ impl TaskManager { SpawnEssentialTaskHandle::new(self.essential_failed_tx.clone(), self.spawn_handle()) } - /// Send the signal for termination, prevent new tasks to be created, await for all the existing - /// tasks to be finished and drop the object. You can consider this as an async drop. - /// - /// It's always better to call and await this function before exiting the process as background - /// tasks may be running in the background. If the process exit and the background tasks are not - /// cancelled, this will lead to objects not getting dropped properly. - /// - /// This is an issue in some cases as some of our dependencies do require that we drop all the - /// objects properly otherwise it triggers a SIGABRT on exit. - pub fn clean_shutdown(mut self) -> Pin<Box<dyn Future<Output = ()> + Send>> { - self.terminate(); - let children_shutdowns = self.children.into_iter().map(|x| x.clean_shutdown()); - let keep_alive = self.keep_alive; - let completion_future = self.completion_future; - - Box::pin(async move { - join_all(children_shutdowns).await; - let _ = completion_future.await; - - let _ = keep_alive; - }) - } - /// Return a future that will end with success if the signal to terminate was sent /// (`self.terminate()`) or with an error if an essential task fails. /// /// # Warning /// - /// This function will not wait until the end of the remaining task. You must call and await - /// `clean_shutdown()` after this. + /// This function will not wait until the end of the remaining task. pub fn future<'a>( &'a mut self, ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>> { @@ -417,18 +370,6 @@ impl TaskManager { }) } - /// Signal to terminate all the running tasks. - pub fn terminate(&mut self) { - if let Some(signal) = self.signal.take() { - let _ = signal.fire(); - // NOTE: this will prevent new tasks to be spawned - self.task_notifier.close_channel(); - for child in self.children.iter_mut() { - child.terminate(); - } - } - } - /// Set what the task manager should keep alive, can be called multiple times. pub fn keep_alive<T: 'static + Send>(&mut self, to_keep_alive: T) { // allows this fn to safely called multiple times. diff --git a/substrate/client/service/src/task_manager/tests.rs b/substrate/client/service/src/task_manager/tests.rs index 75092ff2ae62e54fcdfe76edad3d7f24624afa50..f14023f34f6bd1ea3c2f8f23d1b98848c9dd618f 100644 --- a/substrate/client/service/src/task_manager/tests.rs +++ b/substrate/client/service/src/task_manager/tests.rs @@ -90,225 +90,163 @@ fn new_task_manager(tokio_handle: tokio::runtime::Handle) -> TaskManager { #[test] fn ensure_tasks_are_awaited_on_shutdown() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let handle = runtime.handle().clone(); - - let task_manager = new_task_manager(handle); - let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); - assert_eq!(drop_tester, 2); - // allow the tasks to even start - runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); - assert_eq!(drop_tester, 2); - runtime.block_on(task_manager.clean_shutdown()); + { + let runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + + let task_manager = new_task_manager(handle); + let spawn_handle = task_manager.spawn_handle(); + spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); + assert_eq!(drop_tester, 2); + // allow the tasks to even start + runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); + assert_eq!(drop_tester, 2); + } drop_tester.wait_on_drop(); } #[test] fn ensure_keep_alive_during_shutdown() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let handle = runtime.handle().clone(); - - let mut task_manager = new_task_manager(handle); - let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - task_manager.keep_alive(drop_tester.new_ref()); - spawn_handle.spawn("task1", None, run_background_task(())); - assert_eq!(drop_tester, 1); - // allow the tasks to even start - runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); - assert_eq!(drop_tester, 1); - runtime.block_on(task_manager.clean_shutdown()); + { + let runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + + let mut task_manager = new_task_manager(handle); + let spawn_handle = task_manager.spawn_handle(); + task_manager.keep_alive(drop_tester.new_ref()); + spawn_handle.spawn("task1", None, run_background_task(())); + assert_eq!(drop_tester, 1); + // allow the tasks to even start + runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); + assert_eq!(drop_tester, 1); + } drop_tester.wait_on_drop(); } #[test] fn ensure_blocking_futures_are_awaited_on_shutdown() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let handle = runtime.handle().clone(); - - let task_manager = new_task_manager(handle); - let spawn_handle = task_manager.spawn_handle(); - let drop_tester = DropTester::new(); - spawn_handle.spawn( - "task1", - None, - run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()), - ); - spawn_handle.spawn( - "task2", - None, - run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()), - ); - assert_eq!(drop_tester, 2); - // allow the tasks to even start - runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); - assert_eq!(drop_tester, 2); - runtime.block_on(task_manager.clean_shutdown()); - assert_eq!(drop_tester, 0); -} - -#[test] -fn ensure_no_task_can_be_spawn_after_terminate() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let handle = runtime.handle().clone(); - - let mut task_manager = new_task_manager(handle); - let spawn_handle = task_manager.spawn_handle(); - let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); - assert_eq!(drop_tester, 2); - // allow the tasks to even start - runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); - assert_eq!(drop_tester, 2); - task_manager.terminate(); - spawn_handle.spawn("task3", None, run_background_task(drop_tester.new_ref())); - runtime.block_on(task_manager.clean_shutdown()); - drop_tester.wait_on_drop(); -} - -#[test] -fn ensure_task_manager_future_ends_when_task_manager_terminated() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let handle = runtime.handle().clone(); - - let mut task_manager = new_task_manager(handle); - let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); - assert_eq!(drop_tester, 2); - // allow the tasks to even start - runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); - assert_eq!(drop_tester, 2); - task_manager.terminate(); - runtime.block_on(task_manager.future()).expect("future has ended without error"); - runtime.block_on(task_manager.clean_shutdown()); + { + let runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + + let task_manager = new_task_manager(handle); + let spawn_handle = task_manager.spawn_handle(); + spawn_handle.spawn( + "task1", + None, + run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()), + ); + spawn_handle.spawn( + "task2", + None, + run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()), + ); + assert_eq!(drop_tester, 2); + // allow the tasks to even start + runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); + assert_eq!(drop_tester, 2); + } assert_eq!(drop_tester, 0); } #[test] fn ensure_task_manager_future_ends_with_error_when_essential_task_fails() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let handle = runtime.handle().clone(); - - let mut task_manager = new_task_manager(handle); - let spawn_handle = task_manager.spawn_handle(); - let spawn_essential_handle = task_manager.spawn_essential_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); - assert_eq!(drop_tester, 2); - // allow the tasks to even start - runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); - assert_eq!(drop_tester, 2); - spawn_essential_handle.spawn("task3", None, async { panic!("task failed") }); - runtime - .block_on(task_manager.future()) - .expect_err("future()'s Result must be Err"); - assert_eq!(drop_tester, 2); - runtime.block_on(task_manager.clean_shutdown()); - drop_tester.wait_on_drop(); -} - -#[test] -fn ensure_children_tasks_ends_when_task_manager_terminated() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let handle = runtime.handle().clone(); - - let mut task_manager = new_task_manager(handle.clone()); - let child_1 = new_task_manager(handle.clone()); - let spawn_handle_child_1 = child_1.spawn_handle(); - let child_2 = new_task_manager(handle.clone()); - let spawn_handle_child_2 = child_2.spawn_handle(); - task_manager.add_child(child_1); - task_manager.add_child(child_2); - let spawn_handle = task_manager.spawn_handle(); - let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); - spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref())); - spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref())); - assert_eq!(drop_tester, 4); - // allow the tasks to even start - runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); - assert_eq!(drop_tester, 4); - task_manager.terminate(); - runtime.block_on(task_manager.future()).expect("future has ended without error"); - runtime.block_on(task_manager.clean_shutdown()); + { + let runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + + let mut task_manager = new_task_manager(handle); + let spawn_handle = task_manager.spawn_handle(); + let spawn_essential_handle = task_manager.spawn_essential_handle(); + spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); + assert_eq!(drop_tester, 2); + // allow the tasks to even start + runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); + assert_eq!(drop_tester, 2); + spawn_essential_handle.spawn("task3", None, async { panic!("task failed") }); + runtime + .block_on(task_manager.future()) + .expect_err("future()'s Result must be Err"); + assert_eq!(drop_tester, 2); + } drop_tester.wait_on_drop(); } #[test] fn ensure_task_manager_future_ends_with_error_when_childs_essential_task_fails() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let handle = runtime.handle().clone(); - - let mut task_manager = new_task_manager(handle.clone()); - let child_1 = new_task_manager(handle.clone()); - let spawn_handle_child_1 = child_1.spawn_handle(); - let spawn_essential_handle_child_1 = child_1.spawn_essential_handle(); - let child_2 = new_task_manager(handle.clone()); - let spawn_handle_child_2 = child_2.spawn_handle(); - task_manager.add_child(child_1); - task_manager.add_child(child_2); - let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); - spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref())); - spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref())); - assert_eq!(drop_tester, 4); - // allow the tasks to even start - runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); - assert_eq!(drop_tester, 4); - spawn_essential_handle_child_1.spawn("task5", None, async { panic!("task failed") }); - runtime - .block_on(task_manager.future()) - .expect_err("future()'s Result must be Err"); - assert_eq!(drop_tester, 4); - runtime.block_on(task_manager.clean_shutdown()); + { + let runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + + let mut task_manager = new_task_manager(handle.clone()); + let child_1 = new_task_manager(handle.clone()); + let spawn_handle_child_1 = child_1.spawn_handle(); + let spawn_essential_handle_child_1 = child_1.spawn_essential_handle(); + let child_2 = new_task_manager(handle.clone()); + let spawn_handle_child_2 = child_2.spawn_handle(); + task_manager.add_child(child_1); + task_manager.add_child(child_2); + let spawn_handle = task_manager.spawn_handle(); + spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); + spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref())); + spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref())); + assert_eq!(drop_tester, 4); + // allow the tasks to even start + runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); + assert_eq!(drop_tester, 4); + spawn_essential_handle_child_1.spawn("task5", None, async { panic!("task failed") }); + runtime + .block_on(task_manager.future()) + .expect_err("future()'s Result must be Err"); + assert_eq!(drop_tester, 4); + } drop_tester.wait_on_drop(); } #[test] fn ensure_task_manager_future_continues_when_childs_not_essential_task_fails() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let handle = runtime.handle().clone(); - - let mut task_manager = new_task_manager(handle.clone()); - let child_1 = new_task_manager(handle.clone()); - let spawn_handle_child_1 = child_1.spawn_handle(); - let child_2 = new_task_manager(handle.clone()); - let spawn_handle_child_2 = child_2.spawn_handle(); - task_manager.add_child(child_1); - task_manager.add_child(child_2); - let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); - spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref())); - spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref())); - assert_eq!(drop_tester, 4); - // allow the tasks to even start - runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); - assert_eq!(drop_tester, 4); - spawn_handle_child_1.spawn("task5", None, async { panic!("task failed") }); - runtime.block_on(async { - let t1 = task_manager.future().fuse(); - let t2 = tokio::time::sleep(Duration::from_secs(3)).fuse(); - - pin_mut!(t1, t2); - - select! { - res = t1 => panic!("task should not have stopped: {:?}", res), - _ = t2 => {}, - } - }); - assert_eq!(drop_tester, 4); - runtime.block_on(task_manager.clean_shutdown()); + { + let runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + + let mut task_manager = new_task_manager(handle.clone()); + let child_1 = new_task_manager(handle.clone()); + let spawn_handle_child_1 = child_1.spawn_handle(); + let child_2 = new_task_manager(handle.clone()); + let spawn_handle_child_2 = child_2.spawn_handle(); + task_manager.add_child(child_1); + task_manager.add_child(child_2); + let spawn_handle = task_manager.spawn_handle(); + spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); + spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref())); + spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref())); + assert_eq!(drop_tester, 4); + // allow the tasks to even start + runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); + assert_eq!(drop_tester, 4); + spawn_handle_child_1.spawn("task5", None, async { panic!("task failed") }); + runtime.block_on(async { + let t1 = task_manager.future().fuse(); + let t2 = tokio::time::sleep(Duration::from_secs(3)).fuse(); + + pin_mut!(t1, t2); + + select! { + res = t1 => panic!("task should not have stopped: {:?}", res), + _ = t2 => {}, + } + }); + assert_eq!(drop_tester, 4); + } drop_tester.wait_on_drop(); } diff --git a/substrate/test-utils/test-runner/src/node.rs b/substrate/test-utils/test-runner/src/node.rs index 07259263c5e4d76febc080962d9d355b1d8cddcd..5fd8e4669c33dff1ac9c67cf8ff910b66c0aeb17 100644 --- a/substrate/test-utils/test-runner/src/node.rs +++ b/substrate/test-utils/test-runner/src/node.rs @@ -261,8 +261,6 @@ where let signal = tokio::signal::ctrl_c(); futures::pin_mut!(signal); futures::future::select(task, signal).await; - // we don't really care whichever comes first. - task_manager.clean_shutdown().await } } }