Skip to content
Snippets Groups Projects
Commit 25239c86 authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Taskmanager: Remove `clean_shutdown` (#10314)

There is no reason for this function, tokio already blocks automatically until all tasks are ended.
Another reason to remove this feature is `mpsc_background_tasks` unbounded channel. Recently this
channel was reporting too many unprocessed elements. We assume that this was a result of a lot of
very shot lived tasks that somehow flooded this channel.
parent 5ea4823d
Branches
No related merge requests found
......@@ -98,7 +98,7 @@ where
pin_mut!(f);
tokio_runtime.block_on(main(f))?;
tokio_runtime.block_on(task_manager.clean_shutdown());
drop(task_manager);
Ok(())
}
......@@ -154,7 +154,6 @@ impl<C: SubstrateCli> Runner<C> {
self.print_node_infos();
let mut task_manager = self.tokio_runtime.block_on(initialize(self.config))?;
let res = self.tokio_runtime.block_on(main(task_manager.future().fuse()));
self.tokio_runtime.block_on(task_manager.clean_shutdown());
Ok(res?)
}
......
......@@ -21,17 +21,16 @@
use crate::{config::TaskType, Error};
use exit_future::Signal;
use futures::{
future::{join_all, pending, select, try_join_all, BoxFuture, Either},
future::{pending, select, try_join_all, BoxFuture, Either},
Future, FutureExt, StreamExt,
};
use log::debug;
use prometheus_endpoint::{
exponential_buckets, register, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError,
Registry, U64,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use std::{panic, pin::Pin, result::Result};
use tokio::{runtime::Handle, task::JoinHandle};
use tokio::runtime::Handle;
use tracing_futures::Instrument;
mod prometheus_future;
......@@ -73,7 +72,6 @@ pub struct SpawnTaskHandle {
on_exit: exit_future::Exit,
tokio_handle: Handle,
metrics: Option<Metrics>,
task_notifier: TracingUnboundedSender<JoinHandle<()>>,
}
impl SpawnTaskHandle {
......@@ -113,11 +111,6 @@ impl SpawnTaskHandle {
task: impl Future<Output = ()> + Send + 'static,
task_type: TaskType,
) {
if self.task_notifier.is_closed() {
debug!("Attempt to spawn a new task has been prevented: {}", name);
return
}
let on_exit = self.on_exit.clone();
let metrics = self.metrics.clone();
......@@ -169,17 +162,17 @@ impl SpawnTaskHandle {
}
.in_current_span();
let join_handle = match task_type {
TaskType::Async => self.tokio_handle.spawn(future),
match task_type {
TaskType::Async => {
self.tokio_handle.spawn(future);
},
TaskType::Blocking => {
let handle = self.tokio_handle.clone();
self.tokio_handle.spawn_blocking(move || {
handle.block_on(future);
})
});
},
};
let _ = self.task_notifier.unbounded_send(join_handle);
}
}
}
......@@ -288,8 +281,8 @@ pub struct TaskManager {
/// A future that resolves when the service has exited, this is useful to
/// make sure any internally spawned futures stop when the service does.
on_exit: exit_future::Exit,
/// A signal that makes the exit future above resolve, fired on service drop.
signal: Option<Signal>,
/// A signal that makes the exit future above resolve, fired on drop.
_signal: Signal,
/// Tokio runtime handle that is used to spawn futures.
tokio_handle: Handle,
/// Prometheus metric where to report the polling times.
......@@ -301,10 +294,6 @@ pub struct TaskManager {
essential_failed_rx: TracingUnboundedReceiver<()>,
/// Things to keep alive until the task manager is dropped.
keep_alive: Box<dyn std::any::Any + Send>,
/// A sender to a stream of background tasks. This is used for the completion future.
task_notifier: TracingUnboundedSender<JoinHandle<()>>,
/// This future will complete when all the tasks are joined and the stream is closed.
completion_future: JoinHandle<()>,
/// A list of other `TaskManager`'s to terminate and gracefully shutdown when the parent
/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
/// task fails.
......@@ -325,25 +314,14 @@ impl TaskManager {
let metrics = prometheus_registry.map(Metrics::register).transpose()?;
let (task_notifier, background_tasks) = tracing_unbounded("mpsc_background_tasks");
// NOTE: for_each_concurrent will await on all the JoinHandle futures at the same time. It
// is possible to limit this but it's actually better for the memory foot print to await
// them all to not accumulate anything on that stream.
let completion_future =
tokio_handle.spawn(background_tasks.for_each_concurrent(None, |x| async move {
let _ = x.await;
}));
Ok(Self {
on_exit,
signal: Some(signal),
_signal: signal,
tokio_handle,
metrics,
essential_failed_tx,
essential_failed_rx,
keep_alive: Box::new(()),
task_notifier,
completion_future,
children: Vec::new(),
})
}
......@@ -354,7 +332,6 @@ impl TaskManager {
on_exit: self.on_exit.clone(),
tokio_handle: self.tokio_handle.clone(),
metrics: self.metrics.clone(),
task_notifier: self.task_notifier.clone(),
}
}
......@@ -363,36 +340,12 @@ impl TaskManager {
SpawnEssentialTaskHandle::new(self.essential_failed_tx.clone(), self.spawn_handle())
}
/// Send the signal for termination, prevent new tasks to be created, await for all the existing
/// tasks to be finished and drop the object. You can consider this as an async drop.
///
/// It's always better to call and await this function before exiting the process as background
/// tasks may be running in the background. If the process exit and the background tasks are not
/// cancelled, this will lead to objects not getting dropped properly.
///
/// This is an issue in some cases as some of our dependencies do require that we drop all the
/// objects properly otherwise it triggers a SIGABRT on exit.
pub fn clean_shutdown(mut self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
self.terminate();
let children_shutdowns = self.children.into_iter().map(|x| x.clean_shutdown());
let keep_alive = self.keep_alive;
let completion_future = self.completion_future;
Box::pin(async move {
join_all(children_shutdowns).await;
let _ = completion_future.await;
let _ = keep_alive;
})
}
/// Return a future that will end with success if the signal to terminate was sent
/// (`self.terminate()`) or with an error if an essential task fails.
///
/// # Warning
///
/// This function will not wait until the end of the remaining task. You must call and await
/// `clean_shutdown()` after this.
/// This function will not wait until the end of the remaining task.
pub fn future<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>> {
......@@ -417,18 +370,6 @@ impl TaskManager {
})
}
/// Signal to terminate all the running tasks.
pub fn terminate(&mut self) {
if let Some(signal) = self.signal.take() {
let _ = signal.fire();
// NOTE: this will prevent new tasks to be spawned
self.task_notifier.close_channel();
for child in self.children.iter_mut() {
child.terminate();
}
}
}
/// Set what the task manager should keep alive, can be called multiple times.
pub fn keep_alive<T: 'static + Send>(&mut self, to_keep_alive: T) {
// allows this fn to safely called multiple times.
......
......@@ -90,225 +90,163 @@ fn new_task_manager(tokio_handle: tokio::runtime::Handle) -> TaskManager {
#[test]
fn ensure_tasks_are_awaited_on_shutdown() {
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let task_manager = new_task_manager(handle);
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 2);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 2);
runtime.block_on(task_manager.clean_shutdown());
{
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let task_manager = new_task_manager(handle);
let spawn_handle = task_manager.spawn_handle();
spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 2);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 2);
}
drop_tester.wait_on_drop();
}
#[test]
fn ensure_keep_alive_during_shutdown() {
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let mut task_manager = new_task_manager(handle);
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
task_manager.keep_alive(drop_tester.new_ref());
spawn_handle.spawn("task1", None, run_background_task(()));
assert_eq!(drop_tester, 1);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 1);
runtime.block_on(task_manager.clean_shutdown());
{
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let mut task_manager = new_task_manager(handle);
let spawn_handle = task_manager.spawn_handle();
task_manager.keep_alive(drop_tester.new_ref());
spawn_handle.spawn("task1", None, run_background_task(()));
assert_eq!(drop_tester, 1);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 1);
}
drop_tester.wait_on_drop();
}
#[test]
fn ensure_blocking_futures_are_awaited_on_shutdown() {
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let task_manager = new_task_manager(handle);
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn(
"task1",
None,
run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()),
);
spawn_handle.spawn(
"task2",
None,
run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()),
);
assert_eq!(drop_tester, 2);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 2);
runtime.block_on(task_manager.clean_shutdown());
assert_eq!(drop_tester, 0);
}
#[test]
fn ensure_no_task_can_be_spawn_after_terminate() {
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let mut task_manager = new_task_manager(handle);
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 2);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 2);
task_manager.terminate();
spawn_handle.spawn("task3", None, run_background_task(drop_tester.new_ref()));
runtime.block_on(task_manager.clean_shutdown());
drop_tester.wait_on_drop();
}
#[test]
fn ensure_task_manager_future_ends_when_task_manager_terminated() {
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let mut task_manager = new_task_manager(handle);
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 2);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 2);
task_manager.terminate();
runtime.block_on(task_manager.future()).expect("future has ended without error");
runtime.block_on(task_manager.clean_shutdown());
{
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let task_manager = new_task_manager(handle);
let spawn_handle = task_manager.spawn_handle();
spawn_handle.spawn(
"task1",
None,
run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()),
);
spawn_handle.spawn(
"task2",
None,
run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()),
);
assert_eq!(drop_tester, 2);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 2);
}
assert_eq!(drop_tester, 0);
}
#[test]
fn ensure_task_manager_future_ends_with_error_when_essential_task_fails() {
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let mut task_manager = new_task_manager(handle);
let spawn_handle = task_manager.spawn_handle();
let spawn_essential_handle = task_manager.spawn_essential_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 2);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 2);
spawn_essential_handle.spawn("task3", None, async { panic!("task failed") });
runtime
.block_on(task_manager.future())
.expect_err("future()'s Result must be Err");
assert_eq!(drop_tester, 2);
runtime.block_on(task_manager.clean_shutdown());
drop_tester.wait_on_drop();
}
#[test]
fn ensure_children_tasks_ends_when_task_manager_terminated() {
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let mut task_manager = new_task_manager(handle.clone());
let child_1 = new_task_manager(handle.clone());
let spawn_handle_child_1 = child_1.spawn_handle();
let child_2 = new_task_manager(handle.clone());
let spawn_handle_child_2 = child_2.spawn_handle();
task_manager.add_child(child_1);
task_manager.add_child(child_2);
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref()));
spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 4);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 4);
task_manager.terminate();
runtime.block_on(task_manager.future()).expect("future has ended without error");
runtime.block_on(task_manager.clean_shutdown());
{
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let mut task_manager = new_task_manager(handle);
let spawn_handle = task_manager.spawn_handle();
let spawn_essential_handle = task_manager.spawn_essential_handle();
spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 2);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 2);
spawn_essential_handle.spawn("task3", None, async { panic!("task failed") });
runtime
.block_on(task_manager.future())
.expect_err("future()'s Result must be Err");
assert_eq!(drop_tester, 2);
}
drop_tester.wait_on_drop();
}
#[test]
fn ensure_task_manager_future_ends_with_error_when_childs_essential_task_fails() {
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let mut task_manager = new_task_manager(handle.clone());
let child_1 = new_task_manager(handle.clone());
let spawn_handle_child_1 = child_1.spawn_handle();
let spawn_essential_handle_child_1 = child_1.spawn_essential_handle();
let child_2 = new_task_manager(handle.clone());
let spawn_handle_child_2 = child_2.spawn_handle();
task_manager.add_child(child_1);
task_manager.add_child(child_2);
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref()));
spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 4);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 4);
spawn_essential_handle_child_1.spawn("task5", None, async { panic!("task failed") });
runtime
.block_on(task_manager.future())
.expect_err("future()'s Result must be Err");
assert_eq!(drop_tester, 4);
runtime.block_on(task_manager.clean_shutdown());
{
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let mut task_manager = new_task_manager(handle.clone());
let child_1 = new_task_manager(handle.clone());
let spawn_handle_child_1 = child_1.spawn_handle();
let spawn_essential_handle_child_1 = child_1.spawn_essential_handle();
let child_2 = new_task_manager(handle.clone());
let spawn_handle_child_2 = child_2.spawn_handle();
task_manager.add_child(child_1);
task_manager.add_child(child_2);
let spawn_handle = task_manager.spawn_handle();
spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref()));
spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 4);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 4);
spawn_essential_handle_child_1.spawn("task5", None, async { panic!("task failed") });
runtime
.block_on(task_manager.future())
.expect_err("future()'s Result must be Err");
assert_eq!(drop_tester, 4);
}
drop_tester.wait_on_drop();
}
#[test]
fn ensure_task_manager_future_continues_when_childs_not_essential_task_fails() {
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let mut task_manager = new_task_manager(handle.clone());
let child_1 = new_task_manager(handle.clone());
let spawn_handle_child_1 = child_1.spawn_handle();
let child_2 = new_task_manager(handle.clone());
let spawn_handle_child_2 = child_2.spawn_handle();
task_manager.add_child(child_1);
task_manager.add_child(child_2);
let spawn_handle = task_manager.spawn_handle();
let drop_tester = DropTester::new();
spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref()));
spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 4);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 4);
spawn_handle_child_1.spawn("task5", None, async { panic!("task failed") });
runtime.block_on(async {
let t1 = task_manager.future().fuse();
let t2 = tokio::time::sleep(Duration::from_secs(3)).fuse();
pin_mut!(t1, t2);
select! {
res = t1 => panic!("task should not have stopped: {:?}", res),
_ = t2 => {},
}
});
assert_eq!(drop_tester, 4);
runtime.block_on(task_manager.clean_shutdown());
{
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let mut task_manager = new_task_manager(handle.clone());
let child_1 = new_task_manager(handle.clone());
let spawn_handle_child_1 = child_1.spawn_handle();
let child_2 = new_task_manager(handle.clone());
let spawn_handle_child_2 = child_2.spawn_handle();
task_manager.add_child(child_1);
task_manager.add_child(child_2);
let spawn_handle = task_manager.spawn_handle();
spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref()));
spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref()));
spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref()));
spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref()));
assert_eq!(drop_tester, 4);
// allow the tasks to even start
runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await });
assert_eq!(drop_tester, 4);
spawn_handle_child_1.spawn("task5", None, async { panic!("task failed") });
runtime.block_on(async {
let t1 = task_manager.future().fuse();
let t2 = tokio::time::sleep(Duration::from_secs(3)).fuse();
pin_mut!(t1, t2);
select! {
res = t1 => panic!("task should not have stopped: {:?}", res),
_ = t2 => {},
}
});
assert_eq!(drop_tester, 4);
}
drop_tester.wait_on_drop();
}
......@@ -261,8 +261,6 @@ where
let signal = tokio::signal::ctrl_c();
futures::pin_mut!(signal);
futures::future::select(task, signal).await;
// we don't really care whichever comes first.
task_manager.clean_shutdown().await
}
}
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment