diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock
index c93a0b7fdd28ac3a3f82d585cb0948358deea39b..b2aca09fe25643c8ece2f2b55250e0f3462c2bed 100644
--- a/substrate/Cargo.lock
+++ b/substrate/Cargo.lock
@@ -7280,6 +7280,7 @@ dependencies = [
  "sp-keystore",
  "sp-panic-handler",
  "sp-runtime",
+ "sp-tracing",
  "sp-version",
  "tempfile",
  "thiserror",
diff --git a/substrate/client/cli/Cargo.toml b/substrate/client/cli/Cargo.toml
index a5de78da487c80727f28b835d665fe6e5f122097..ee4426b91dbb8213f1cc34b57f7db8915ca9650a 100644
--- a/substrate/client/cli/Cargo.toml
+++ b/substrate/client/cli/Cargo.toml
@@ -50,6 +50,7 @@ sp-version = { version = "5.0.0", path = "../../primitives/version" }
 [dev-dependencies]
 tempfile = "3.1.0"
 futures-timer = "3.0.1"
+sp-tracing = { version = "6.0.0", path = "../../primitives/tracing" }
 
 [features]
 default = ["rocksdb"]
diff --git a/substrate/client/cli/src/runner.rs b/substrate/client/cli/src/runner.rs
index d4191feddfa90f88f955d586220f3e357fcf61e3..1a532b3bbc6fbdb6eb54dc1ec5d10308e02fba4b 100644
--- a/substrate/client/cli/src/runner.rs
+++ b/substrate/client/cli/src/runner.rs
@@ -152,10 +152,38 @@ impl<C: SubstrateCli> Runner<C> {
 		//
 		// This is important to be done before we instruct the tokio runtime to shutdown. Otherwise
 		// the tokio runtime will wait the full 60 seconds for all tasks to stop.
-		drop(task_manager);
+		let task_registry = task_manager.into_task_registry();
 
 		// Give all futures 60 seconds to shutdown, before tokio "leaks" them.
-		self.tokio_runtime.shutdown_timeout(Duration::from_secs(60));
+		let shutdown_timeout = Duration::from_secs(60);
+		self.tokio_runtime.shutdown_timeout(shutdown_timeout);
+
+		let running_tasks = task_registry.running_tasks();
+
+		if !running_tasks.is_empty() {
+			log::error!("Detected running(potentially stalled) tasks on shutdown:");
+			running_tasks.iter().for_each(|(task, count)| {
+				let instances_desc =
+					if *count > 1 { format!("with {} instances ", count) } else { "".to_string() };
+
+				if task.is_default_group() {
+					log::error!(
+						"Task \"{}\" was still running {}after waiting {} seconds to finish.",
+						task.name,
+						instances_desc,
+						shutdown_timeout.as_secs(),
+					);
+				} else {
+					log::error!(
+						"Task \"{}\" (Group: {}) was still running {}after waiting {} seconds to finish.",
+						task.name,
+						task.group,
+						instances_desc,
+						shutdown_timeout.as_secs(),
+					);
+				}
+			});
+		}
 
 		res.map_err(Into::into)
 	}
@@ -388,34 +416,75 @@ mod tests {
 		assert!((count as u128) < (Duration::from_secs(30).as_millis() / 50));
 	}
 
+	fn run_test_in_another_process(
+		test_name: &str,
+		test_body: impl FnOnce(),
+	) -> Option<std::process::Output> {
+		if std::env::var("RUN_FORKED_TEST").is_ok() {
+			test_body();
+			None
+		} else {
+			let output = std::process::Command::new(std::env::current_exe().unwrap())
+				.arg(test_name)
+				.env("RUN_FORKED_TEST", "1")
+				.output()
+				.unwrap();
+
+			assert!(output.status.success());
+			Some(output)
+		}
+	}
+
 	/// This test ensures that `run_node_until_exit` aborts waiting for "stuck" tasks after 60
 	/// seconds, aka doesn't wait until they are finished (which may never happen).
 	#[test]
 	fn ensure_run_until_exit_is_not_blocking_indefinitely() {
-		let runner = create_runner();
+		let output = run_test_in_another_process(
+			"ensure_run_until_exit_is_not_blocking_indefinitely",
+			|| {
+				sp_tracing::try_init_simple();
+
+				let runner = create_runner();
+
+				runner
+					.run_node_until_exit(move |cfg| async move {
+						let task_manager =
+							TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
+						let (sender, receiver) = futures::channel::oneshot::channel();
+
+						// We need to use `spawn_blocking` here so that we get a dedicated thread
+						// for our future. This future is more blocking code that will never end.
+						task_manager.spawn_handle().spawn_blocking("test", None, async move {
+							let _ = sender.send(());
+							loop {
+								std::thread::sleep(Duration::from_secs(30));
+							}
+						});
+
+						task_manager.spawn_essential_handle().spawn_blocking(
+							"test2",
+							None,
+							async {
+								// Let's stop this essential task directly when our other task
+								// started. It will signal that the task manager should end.
+								let _ = receiver.await;
+							},
+						);
+
+						Ok::<_, sc_service::Error>(task_manager)
+					})
+					.unwrap_err();
+			},
+		);
 
-		runner
-			.run_node_until_exit(move |cfg| async move {
-				let task_manager = TaskManager::new(cfg.tokio_handle.clone(), None).unwrap();
-				let (sender, receiver) = futures::channel::oneshot::channel();
+		let Some(output) = output else { return } ;
 
-				// We need to use `spawn_blocking` here so that we get a dedicated thread for our
-				// future. This future is more blocking code that will never end.
-				task_manager.spawn_handle().spawn_blocking("test", None, async move {
-					let _ = sender.send(());
-					loop {
-						std::thread::sleep(Duration::from_secs(30));
-					}
-				});
-
-				task_manager.spawn_essential_handle().spawn_blocking("test2", None, async {
-					// Let's stop this essential task directly when our other task started.
-					// It will signal that the task manager should end.
-					let _ = receiver.await;
-				});
+		let stderr = dbg!(String::from_utf8(output.stderr).unwrap());
 
-				Ok::<_, sc_service::Error>(task_manager)
-			})
-			.unwrap_err();
+		assert!(
+			stderr.contains("Task \"test\" was still running after waiting 60 seconds to finish.")
+		);
+		assert!(!stderr
+			.contains("Task \"test2\" was still running after waiting 60 seconds to finish."));
 	}
 }
diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs
index 8b3a29ba4032a34f9bdae9c712fbfc72cf59d563..1529b822ade3256f64db48b1ed76dd320d7909e9 100644
--- a/substrate/client/service/src/lib.rs
+++ b/substrate/client/service/src/lib.rs
@@ -83,7 +83,7 @@ pub use sc_transaction_pool::Options as TransactionPoolOptions;
 pub use sc_transaction_pool_api::{error::IntoPoolError, InPoolTransaction, TransactionPool};
 #[doc(hidden)]
 pub use std::{ops::Deref, result::Result, sync::Arc};
-pub use task_manager::{SpawnTaskHandle, TaskManager, DEFAULT_GROUP_NAME};
+pub use task_manager::{SpawnTaskHandle, Task, TaskManager, TaskRegistry, DEFAULT_GROUP_NAME};
 
 const DEFAULT_PROTOCOL_ID: &str = "sup";
 
diff --git a/substrate/client/service/src/task_manager/mod.rs b/substrate/client/service/src/task_manager/mod.rs
index 23265f9672555d33492cf323308ead387ff90345..d792122576444324fef0d1ed05a9cd2d1369bc0e 100644
--- a/substrate/client/service/src/task_manager/mod.rs
+++ b/substrate/client/service/src/task_manager/mod.rs
@@ -24,12 +24,19 @@ use futures::{
 	future::{pending, select, try_join_all, BoxFuture, Either},
 	Future, FutureExt, StreamExt,
 };
+use parking_lot::Mutex;
 use prometheus_endpoint::{
 	exponential_buckets, register, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError,
 	Registry, U64,
 };
 use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
-use std::{panic, pin::Pin, result::Result};
+use std::{
+	collections::{hash_map::Entry, HashMap},
+	panic,
+	pin::Pin,
+	result::Result,
+	sync::Arc,
+};
 use tokio::runtime::Handle;
 use tracing_futures::Instrument;
 
@@ -72,6 +79,7 @@ pub struct SpawnTaskHandle {
 	on_exit: exit_future::Exit,
 	tokio_handle: Handle,
 	metrics: Option<Metrics>,
+	task_registry: TaskRegistry,
 }
 
 impl SpawnTaskHandle {
@@ -113,6 +121,7 @@ impl SpawnTaskHandle {
 	) {
 		let on_exit = self.on_exit.clone();
 		let metrics = self.metrics.clone();
+		let registry = self.task_registry.clone();
 
 		let group = match group.into() {
 			GroupName::Specific(var) => var,
@@ -129,6 +138,10 @@ impl SpawnTaskHandle {
 		}
 
 		let future = async move {
+			// Register the task and keep the "token" alive until the task is ended. Then this
+			// "token" will unregister this task.
+			let _registry_token = registry.register_task(name, group);
+
 			if let Some(metrics) = metrics {
 				// Add some wrappers around `task`.
 				let task = {
@@ -298,6 +311,8 @@ pub struct TaskManager {
 	/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
 	/// task fails.
 	children: Vec<TaskManager>,
+	/// The registry of all running tasks.
+	task_registry: TaskRegistry,
 }
 
 impl TaskManager {
@@ -324,6 +339,7 @@ impl TaskManager {
 			essential_failed_rx,
 			keep_alive: Box::new(()),
 			children: Vec::new(),
+			task_registry: Default::default(),
 		})
 	}
 
@@ -333,6 +349,7 @@ impl TaskManager {
 			on_exit: self.on_exit.clone(),
 			tokio_handle: self.tokio_handle.clone(),
 			metrics: self.metrics.clone(),
+			task_registry: self.task_registry.clone(),
 		}
 	}
 
@@ -385,6 +402,14 @@ impl TaskManager {
 	pub fn add_child(&mut self, child: TaskManager) {
 		self.children.push(child);
 	}
+
+	/// Consume `self` and return the [`TaskRegistry`].
+	///
+	/// This [`TaskRegistry`] can be used to check for still running tasks after this task manager
+	/// was dropped.
+	pub fn into_task_registry(self) -> TaskRegistry {
+		self.task_registry
+	}
 }
 
 #[derive(Clone)]
@@ -434,3 +459,74 @@ impl Metrics {
 		})
 	}
 }
+
+/// Ensures that a [`Task`] is unregistered when this object is dropped.
+struct UnregisterOnDrop {
+	task: Task,
+	registry: TaskRegistry,
+}
+
+impl Drop for UnregisterOnDrop {
+	fn drop(&mut self) {
+		let mut tasks = self.registry.tasks.lock();
+
+		if let Entry::Occupied(mut entry) = (*tasks).entry(self.task.clone()) {
+			*entry.get_mut() -= 1;
+
+			if *entry.get() == 0 {
+				entry.remove();
+			}
+		}
+	}
+}
+
+/// Represents a running async task in the [`TaskManager`].
+///
+/// As a task is identified by a name and a group, it is totally valid that there exists multiple
+/// tasks with the same name and group.
+#[derive(Clone, Hash, Eq, PartialEq)]
+pub struct Task {
+	/// The name of the task.
+	pub name: &'static str,
+	/// The group this task is associated to.
+	pub group: &'static str,
+}
+
+impl Task {
+	/// Returns if the `group` is the [`DEFAULT_GROUP_NAME`].
+	pub fn is_default_group(&self) -> bool {
+		self.group == DEFAULT_GROUP_NAME
+	}
+}
+
+/// Keeps track of all running [`Task`]s in [`TaskManager`].
+#[derive(Clone, Default)]
+pub struct TaskRegistry {
+	tasks: Arc<Mutex<HashMap<Task, usize>>>,
+}
+
+impl TaskRegistry {
+	/// Register a task with the given `name` and `group`.
+	///
+	/// Returns [`UnregisterOnDrop`] that ensures that the task is unregistered when this value is
+	/// dropped.
+	fn register_task(&self, name: &'static str, group: &'static str) -> UnregisterOnDrop {
+		let task = Task { name, group };
+
+		{
+			let mut tasks = self.tasks.lock();
+
+			*(*tasks).entry(task.clone()).or_default() += 1;
+		}
+
+		UnregisterOnDrop { task, registry: self.clone() }
+	}
+
+	/// Returns the running tasks.
+	///
+	/// As a task is only identified by its `name` and `group`, there can be duplicate tasks. The
+	/// number per task represents the concurrently running tasks with the same identifier.
+	pub fn running_tasks(&self) -> HashMap<Task, usize> {
+		(*self.tasks.lock()).clone()
+	}
+}